Skip to main content
Version: v3.0 print this page

Advanced File Load Validation

Overview

The file load validation node provides advanced functionality for scenarios involving:

  • Concurrent ETL job execution
  • Multiple partition management beyond the default 'upload_date' partition

In these cases, you'll need to provide an input manifest file to help the validation node identify the correct files.

Input Manifest File Requirements

File Specifications

  1. Format: JSON
  2. Location: Must be stored at:
s3://<etl_bucket_name>/<preceding_etl_job_id>/temp/manifest-files/<data_pipeline_name>/<data_pipeline_execution_id>-input_manifest_file.json
  1. File should have the schema structure as specified below.

Schema Structure

Required Schema
[{
"Domain": "string",
"DatasetName": "string",
"Partitions": [
{"upload_date": "string", "custom_partition": "string", ...},
{"upload_date": "string", "custom_partition": "string", ...}
]
}]

Note: Multi-partition functionality is only available for datasets in the S3-Athena Target Location.

Example Manifest Files

Single Dataset Example

Input manifest for single dataset
[{
"Domain": "insurance",
"DatasetName": "customers",
"Partitions": [
{
"upload_date": "1647298527",
"uploaded_by": "Sam"
},
{
"upload_date": "1647298589",
"uploaded_by": "Rachel"
},
{
"upload_date": "1647298599",
"uploaded_by": "Alex"
}
]
}]

Multiple Datasets Example

Input manifest for multiple datasets
[{
"Domain": "insurance",
"DatasetName": "customers",
"Partitions": [
{
"upload_date": "1647298527",
"uploaded_by": "Sam"
},
{
"upload_date": "1647298589",
"uploaded_by": "Rachel"
},
{
"upload_date": "1647298599",
"uploaded_by": "Alex"
}
]
},
{
"Domain": "insurance",
"DatasetName": "employees",
"Partitions": [
{
"upload_date": "1647298527",
"uploaded_by": "William"
},
{
"upload_date": "1647298589",
"uploaded_by": "Scarlett"
},
{
"upload_date": "1647298599",
"uploaded_by": "Zaire"
}
]
}]

Implementation Example

The following code demonstrates how to:

  1. Write data to multiple datasets with multi-partition support
  2. Generate and store the required input manifest file

Important: Replace all values enclosed in < > with your actual values.

Multi-dataset partition handling
import os
import sys
import datetime
import boto3
import time
from awsglue.utils import getResolvedOptions
import pandas as pd
from io import StringIO
import boto3
import json


def write_csv_dataframe_to_s3(s3_resource, dataframe, bucket, file_path):
"""
writes csv file to s3
"""
csv_buffer = StringIO()
dataframe.to_csv(csv_buffer, index=False)
s3_resource.Object(bucket, file_path).put(Body=csv_buffer.getvalue())

def write_json_manifest_file_to_s3(s3, json_data, file_path):
"""
writes json file to s3
"""
s3object = s3.Object('cdap-<region>-<aws_account_id>-master-etl', file_path)
s3object.put(Body=(bytes(json.dumps(json_data).encode('UTF-8'))))

json_data = []
args = getResolvedOptions(sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
data_pipeline_name = args['WORKFLOW_NAME']
data_pipeline_run_id = args['WORKFLOW_RUN_ID']
s3 = boto3.resource('s3')
domain_name = 'insurance'
dataset_name = 'customers'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/customers_1.csv'.format(path))

# Create a dict for domain, dataset combination and partition information
customers_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
customers_partition_dict.update({"Partitions": partitions})
json_data.append(customers_partition_dict)

domain_name = 'insurance'
dataset_name = 'employees'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/employees_1.csv'.format(path))

# Create a dict for domain, dataset combination and partition information
employees_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
employees_partition_dict.update({"Partitions": partitions})
json_data.append(employees_partition_dict)

# Write json_data as input manifest file
file_path = "<etl_job_id>/temp/manifest-files/{}/{}-input_manifest_file.json".format(data_pipeline_name, data_pipeline_run_id)
write_json_manifest_file_to_s3(s3, json_data, file_path)

After execution, you can:

  1. View the data pipeline execution properties in the file validation node
  2. Download the output manifest file to see detailed file load statuses