Version: v3.0 
Advanced File Load Validation
Overview
The file load validation node provides advanced functionality for scenarios involving:
- Concurrent ETL job execution
- Multiple partition management beyond the default 'upload_date' partition
In these cases, you'll need to provide an input manifest file to help the validation node identify the correct files.
Input Manifest File Requirements
File Specifications
- Format: JSON
- Location: Must be stored at:
s3://<etl_bucket_name>/<preceding_etl_job_id>/temp/manifest-files/<data_pipeline_name>/<data_pipeline_execution_id>-input_manifest_file.json
- File should have the schema structure as specified below.
Schema Structure
Required Schema
[{
"Domain": "string",
"DatasetName": "string",
"Partitions": [
{"upload_date": "string", "custom_partition": "string", ...},
{"upload_date": "string", "custom_partition": "string", ...}
]
}]
Note: Multi-partition functionality is only available for datasets in the S3-Athena Target Location.
Example Manifest Files
Single Dataset Example
Input manifest for single dataset
[{
"Domain": "insurance",
"DatasetName": "customers",
"Partitions": [
{
"upload_date": "1647298527",
"uploaded_by": "Sam"
},
{
"upload_date": "1647298589",
"uploaded_by": "Rachel"
},
{
"upload_date": "1647298599",
"uploaded_by": "Alex"
}
]
}]
Multiple Datasets Example
Input manifest for multiple datasets
[{
"Domain": "insurance",
"DatasetName": "customers",
"Partitions": [
{
"upload_date": "1647298527",
"uploaded_by": "Sam"
},
{
"upload_date": "1647298589",
"uploaded_by": "Rachel"
},
{
"upload_date": "1647298599",
"uploaded_by": "Alex"
}
]
},
{
"Domain": "insurance",
"DatasetName": "employees",
"Partitions": [
{
"upload_date": "1647298527",
"uploaded_by": "William"
},
{
"upload_date": "1647298589",
"uploaded_by": "Scarlett"
},
{
"upload_date": "1647298599",
"uploaded_by": "Zaire"
}
]
}]
Implementation Example
The following code demonstrates how to:
- Write data to multiple datasets with multi-partition support
- Generate and store the required input manifest file
Important: Replace all values enclosed in < > with your actual values.
Multi-dataset partition handling
import os
import sys
import datetime
import boto3
import time
from awsglue.utils import getResolvedOptions
import pandas as pd
from io import StringIO
import boto3
import json
def write_csv_dataframe_to_s3(s3_resource, dataframe, bucket, file_path):
"""
writes csv file to s3
"""
csv_buffer = StringIO()
dataframe.to_csv(csv_buffer, index=False)
s3_resource.Object(bucket, file_path).put(Body=csv_buffer.getvalue())
def write_json_manifest_file_to_s3(s3, json_data, file_path):
"""
writes json file to s3
"""
s3object = s3.Object('cdap-<region>-<aws_account_id>-master-etl', file_path)
s3object.put(Body=(bytes(json.dumps(json_data).encode('UTF-8'))))
json_data = []
args = getResolvedOptions(sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
data_pipeline_name = args['WORKFLOW_NAME']
data_pipeline_run_id = args['WORKFLOW_RUN_ID']
s3 = boto3.resource('s3')
domain_name = 'insurance'
dataset_name = 'customers'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/customers_1.csv'.format(path))
# Create a dict for domain, dataset combination and partition information
customers_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
customers_partition_dict.update({"Partitions": partitions})
json_data.append(customers_partition_dict)
domain_name = 'insurance'
dataset_name = 'employees'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/employees_1.csv'.format(path))
# Create a dict for domain, dataset combination and partition information
employees_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
employees_partition_dict.update({"Partitions": partitions})
json_data.append(employees_partition_dict)
# Write json_data as input manifest file
file_path = "<etl_job_id>/temp/manifest-files/{}/{}-input_manifest_file.json".format(data_pipeline_name, data_pipeline_run_id)
write_json_manifest_file_to_s3(s3, json_data, file_path)
After execution, you can:
- View the data pipeline execution properties in the file validation node
- Download the output manifest file to see detailed file load statuses