Skip to main content
Version: v3.2 print this page

Libraries

Libraries are an extension of external job libraries. They are mainly used to maintain a central repository of organization-approved libraries/packages to be used across multiple Jobs or Data labs.

These Libraries have the following capabilities:

  • They allow users to have multiple packages attached to a job, so they can easily switch between them to perform various actions based on the job requirements.
  • They provide the ability to customize job dependencies to a granular level.
  • They offer flexibility to choose among the different type of packages.
Note

Currently based on the type of ETL Job, Amorphic supports "py", "egg" and "whl" extensions for python shell applications and "py", "zip", "jar" for pyspark applications.

Libraries

Library

A Library is a collection of packages/modules that provides a standardized solution for problems in everyday programming. Unlike the OS-provided python supporting the collection, the packages are explicitly designed by User/Organization/Open-Source Community. This encourages and enhances the portability of Python programs by abstracting away the platform-specific APIs into platform-neutral APIs.

The ETL Library has the following properties:

  • A Library can have multiple packages attached to it.
  • A Library can be attached to multiple Jobs.

Types of Amorphic ETL Libraries:

  • External Libraries: Their scope is within the ETL job, and they get removed when user deletes the ETL job.
  • Shared Libraries: They possess a universal scope, allowing multiple jobs to utilize the same shared library upon user authentication, and persist in the central repository even after the ETL job has been deleted.

Amorphic Libraries contain the following information:

TypeDescription
Library NameUniquely identifies the functionality of the library
Library DescriptionA brief explanation of the library typically the contents/package inside it
PackagesIt is a file or a list of files that can be imported into an ETL Job to perform a specific set of operations. Example: matplotlib is A numerical plotting library used by any data scientist or any data analyzer for visualizations
JobsThe list of ETL jobs to which the library is attached
CreatedByUser who created the library.
LastModifiedByUser who has recently updated the library.
LastModifiedTimeTimestamp when the library was recently updated.

Libraries Operations

Amorphic libraries provides the following operations to manage the libraries:

Create Library

To create a new Library in Amorphic, go to the "Create New Library" section under the "Libraries". The application allows libraries to have zero or more packages/jobs attached to it. After creating the Library user can view, update, & delete it. User can only do these operations if permissions to access the libraries is present on them.

note

Users cannot delete a shared library if it is attached to the existing ETL Job. So, when attempted to delete such a library, user will be notified with the list of dependent ETL Jobs with a pop-up. Then, user should remove all the libraries used in Jobs and retry to delete the library.

The below gif shows how a user can create a new library.

Create ETL Library

View Library

To view all the existing library information user must have sufficient permissions. Click the Library name under the "Libraries" section inside the Shared Resources scetion to view the library.

Take a look at how user can view the library information in detail

View library

Attach Library

User can attach a library from the job details page and attach a shared library to a job while creating or updating it. Amorphic provides a list of shared libraries along with other job parameters, which user can then attach to the job. Once attached all the packages in the shared library are passed as arguments to the job automatically without any intervention.

Follow the below gif to attach a shared ETL library to an existing ETL Job.

Attach Library

Importing and using a library

If user has a library with a single version of the required module or multiple different files added in this single library, then they can import the module and use it.

Python
from amorphicutils.common import read_param_store
print(read_param_store("SYSTEM.S3BUCKET.DLZ", secure=False)['data'])

If users have a library with a multiple version of the required module , then they should explicitly insert into the system path the versioned file and then import the module and use it. This ensures it allows picking up the specific version of the library and not a random one.

Python
import sys
# explicitly specify the version user want to use
sys.path.insert(0, "amorphicutils-0.3.1.zip")
from amorphicutils.common import read_param_store
print(read_param_store("SYSTEM.S3BUCKET.DLZ", secure=False)['data'])

System Library

Starting version 3.1, Amorphic Libraries will come with one pre-packaged system library called ETL Toolkit. This system library contains general set of tools and functionality required for general data preprocessing operations fully compatible with both Spark and Pandas(Python Sheel) data formats.

System Library

Click to view ETL Toolkit documentation

AWS Glue ETL Toolkit - Complete API Documentation

A comprehensive, high-quality utility package for AWS Glue ETL operations, providing unified interfaces for data transformations and modular components for advanced operations like schema management, CDC processing, and multi-target data loading.

Quick Start

Installation & Basic Usage

from etlToolkit import GlueTransformationsUtil

# Initialize with Spark session - enables all utilities
transformer = GlueTransformationsUtil(spark_session)

# Or initialize for Pandas-only operations
transformer = GlueTransformationsUtil()

Core Transformation Methods

1. change_column_types(df, type_mapping)

Purpose: Convert DataFrame column data types with validation and error handling.

Use Cases:

  • Converting string dates to date types for temporal analysis
  • Ensuring numeric columns have proper precision for calculations
  • Standardizing data types across different data sources
  • Preparing data for target system compatibility

Parameters:

  • df: Input DataFrame (Spark or Pandas)
  • type_mapping: Dict mapping column names to target types

Supported Types: string, integer, long, double, boolean, date, timestamp, decimal

Example:

type_mapping = {
'customer_id': 'string',
'order_amount': 'double',
'quantity': 'integer',
'is_active': 'boolean',
'order_date': 'date',
'created_at': 'timestamp'
}
df_typed = transformer.change_column_types(df, type_mapping)

2. perform_join(left_df, right_df, join_keys, join_type, suffix_left, suffix_right)

Purpose: Join DataFrames with automatic Spark/Pandas detection and conflict resolution.

Use Cases:

  • Customer data enrichment with demographic information
  • Order-to-product mapping for sales analysis
  • Multi-table data consolidation for reporting
  • Dimensional modeling for data warehousing

Parameters:

  • left_df, right_df: DataFrames to join
  • join_keys: Column name(s) to join on (string or list)
  • join_type: 'inner', 'left', 'right', 'outer'
  • suffix_left, suffix_right: Suffixes for duplicate columns

Example:

df_joined = transformer.perform_join(
left_df=orders_df,
right_df=customers_df,
join_keys=['customer_id'], # Single or multiple keys
join_type='left', # inner, left, right, outer
suffix_left='_order',
suffix_right='_customer'
)

3. execute_sql_block(sql_query, dataframes)

Purpose: Execute complex SQL queries on DataFrames using Spark SQL engine.

Use Cases:

  • Complex analytical queries with window functions
  • Data aggregation and summarization
  • Multi-table analytical processing
  • Advanced filtering and transformation logic

Parameters:

  • sql_query: Standard SQL query string
  • dataframes: Dict mapping table names to DataFrames

Example:

dataframes = {'orders': orders_df, 'customers': customers_df, 'products': products_df}
result = transformer.execute_sql_block("""
WITH monthly_sales AS (
SELECT
customer_id,
DATE_TRUNC('month', order_date) as month,
SUM(amount) as monthly_total,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY SUM(amount) DESC
) as rank
FROM orders o
JOIN products p ON o.product_id = p.product_id
GROUP BY customer_id, DATE_TRUNC('month', order_date)
)
SELECT c.customer_name, ms.month, ms.monthly_total
FROM monthly_sales ms
JOIN customers c ON ms.customer_id = c.customer_id
WHERE ms.rank <= 3
""", dataframes)

4. perform_union(dataframes, union_type)

Purpose: Union multiple DataFrames with deduplication options.

Use Cases:

  • Combining historical and current data
  • Merging data from multiple sources
  • Consolidating partitioned datasets
  • Data lake aggregation scenarios

Parameters:

  • dataframes: List of DataFrames to union
  • union_type: 'union', 'union_all', 'union_distinct'

Example:

combined_df = transformer.perform_union(
dataframes=[current_df, historical_df, archive_df],
union_type='union_distinct' # union, union_all, union_distinct
)

5. generate_uid_column(df, uid_column_name, uid_type)

Purpose: Generate unique identifier columns for data lineage and tracking.

Use Cases:

  • Record-level tracking and auditing
  • Deduplication key generation
  • Distributed processing coordination
  • Data lineage and provenance tracking

Parameters:

  • df: Input DataFrame
  • uid_column_name: Name for the new UID column
  • uid_type: 'uuid', 'monotonic', 'timestamp_based'

Example:

df_with_uuid = transformer.generate_uid_column(df, 'record_uuid', 'uuid')
df_with_sequence = transformer.generate_uid_column(df, 'sequence_id', 'monotonic')
df_with_timestamp = transformer.generate_uid_column(df, 'batch_id', 'timestamp_based')

6. perform_lookup(main_df, lookup_df, lookup_key, lookup_value_column, main_key, result_column_name, default_value)

Purpose: High-performance lookup transformations for foreign key resolution.

Use Cases:

  • Adding descriptive names to ID-based data
  • Currency code to currency name mapping
  • Status code to status description enrichment
  • Reference data integration

Parameters:

  • main_df: Main DataFrame to enrich
  • lookup_df: Reference DataFrame with lookup mappings
  • lookup_key: Key column in lookup DataFrame
  • lookup_value_column: Value column to retrieve from lookup
  • main_key: Key column in main DataFrame
  • result_column_name: Name for new enriched column
  • default_value: Value when lookup fails

Example:

enriched_df = transformer.perform_lookup(
main_df=orders_df,
lookup_df=customer_lookup_df,
lookup_key='customer_id',
lookup_value_column='customer_name',
main_key='customer_id',
result_column_name='customer_name',
default_value='Unknown Customer'
)

7. clean_column_names(df, remove_special_chars, to_lowercase)

Purpose: Clean and standardize column names for system compatibility.

Use Cases:

  • Preparing data for Iceberg/Snowflake ingestion
  • Standardizing column names across data sources
  • Removing problematic characters for SQL compatibility
  • Data warehouse schema standardization

Parameters:

  • df: Input DataFrame
  • remove_special_chars: Whether to remove special characters
  • to_lowercase: Whether to convert to lowercase

Example:

df_clean = transformer.clean_column_names(
df=raw_df,
remove_special_chars=True,
to_lowercase=True
)

8. validate_dataframe_schema(df, expected_columns, strict_mode)

Purpose: Validate DataFrame schema against expected structure.

Use Cases:

  • Data quality validation in ETL pipelines
  • Contract testing between data sources
  • Schema evolution monitoring
  • Data governance compliance checking

Parameters:

  • df: DataFrame to validate
  • expected_columns: List of required column names
  • strict_mode: If True, must have exactly these columns

Example:

expected_columns = ['customer_id', 'order_date', 'amount', 'status']
is_valid = transformer.validate_dataframe_schema(
df=orders_df,
expected_columns=expected_columns,
strict_mode=True # Must have exactly these columns
)

Specialized Utilities

Schema Management

Schema Fetcher

Purpose: Fetch database schemas from multiple sources (AS/400, MariaDB, Generic JDBC) for automated table creation and schema validation.

Access: schema_fetcher = transformer.get_utility('schema_fetcher')

Use Cases:

  • Automated schema discovery from legacy systems
  • Schema migration from AS/400 to modern data platforms
  • Schema validation before data loading
  • Dynamic table creation based on source schemas
  • Metadata extraction for data cataloging

Key Methods:

fetch_as400_schema(table_name, config)

Purpose: Fetch AS/400 (DB2) table schema with column metadata.

Parameters:

  • table_name: Name of the AS/400 table (case-sensitive)
  • config: Dictionary with connection details:
    • host: AS/400 server hostname
    • library: AS/400 library name
    • user: Database username
    • password: Database password

Returns: Schema object with column definitions and data types

Example:

schema_fetcher = transformer.get_utility('schema_fetcher')
as400_config = {
'host': 'as400.company.com',
'library': 'PRODLIB',
'user': 'username',
'password': 'password'
}
schema = schema_fetcher.fetch_as400_schema('CUSTOMER', as400_config)
fetch_mariadb_schema(table_name, config)

Purpose: Fetch MariaDB/MySQL table schema with column metadata.

Parameters:

  • table_name: Name of the MariaDB table
  • config: Dictionary with connection details:
    • host: MariaDB server hostname
    • port: Database port (default: 3306)
    • database: Database name
    • user: Database username
    • password: Database password

Returns: Schema object with column definitions and data types

Example:

mariadb_config = {
'host': 'mariadb.company.com',
'port': 3306,
'database': 'production',
'user': 'username',
'password': 'password'
}
schema = schema_fetcher.fetch_mariadb_schema('customers', mariadb_config)
fetch_generic_schema(table_name, jdbc_url, properties, schema_query)

Purpose: Fetch schema from any JDBC-compatible database using custom SQL query.

Parameters:

  • table_name: Name of the table
  • jdbc_url: JDBC connection URL
  • properties: Dictionary of JDBC connection properties (user, password, driver, etc.)
  • schema_query: SQL query to fetch schema information

Returns: Schema object with column definitions and data types

Example:

schema = schema_fetcher.fetch_generic_schema(
table_name='products',
jdbc_url='jdbc:postgresql://localhost:5432/mydb',
properties={
'user': 'user',
'password': 'pass',
'driver': 'org.postgresql.Driver'
},
schema_query="SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'products'"
)

Table Creator

Purpose: Create tables in target systems (Iceberg, Snowflake) based on schema definitions.

Access: table_creator = transformer.get_utility('table_creator')

Use Cases:

  • Automated table creation from source schemas
  • Data lake table setup (Bronze, Silver, Gold layers)
  • Schema evolution and migration
  • Multi-target table creation for data replication
  • Initial data warehouse setup

Key Methods:

create_iceberg_table_direct(table_name, schema, catalog, database)

Purpose: Create Iceberg table directly in AWS Glue Data Catalog.

Parameters:

  • table_name: Name of the table to create
  • schema: Schema object from schema fetcher
  • catalog: Glue catalog name (typically 'glue_catalog')
  • database: Database name in Glue catalog

Returns: Boolean indicating success

Example:

table_creator = transformer.get_utility('table_creator')
success = table_creator.create_iceberg_table_direct(
table_name='customers',
schema=schema,
catalog='glue_catalog',
database='bronze'
)
create_snowflake_table(table_name, schema, config, schema_name)

Purpose: Create Snowflake table with DDL generation.

Parameters:

  • table_name: Name of the table to create
  • schema: Schema object from schema fetcher
  • config: Snowflake connection configuration dictionary:
    • sf_url: Snowflake account URL
    • sf_user: Snowflake username
    • sf_password: Snowflake password
    • sf_database: Snowflake database name
    • sf_warehouse: Snowflake warehouse name
    • sf_role: Snowflake role name
  • schema_name: Snowflake schema name (e.g., 'BRONZE', 'GOLD')

Returns: Boolean indicating success

Example:

snowflake_config = {
'sf_url': 'https://account.snowflakecomputing.com',
'sf_user': 'username',
'sf_password': 'password',
'sf_database': 'PROD_DB',
'sf_warehouse': 'COMPUTE_WH',
'sf_role': 'DATA_ENGINEER'
}
success = table_creator.create_snowflake_table(
table_name='customers',
schema=schema,
config=snowflake_config,
schema_name='BRONZE'
)

Change Data Capture (CDC)

Delta Processor

Purpose: Process CDC delta files with deduplication and operation separation for efficient change data capture workflows.

Access: delta_processor = transformer.get_utility('delta_processor')

Use Cases:

  • Processing incremental updates from source systems
  • Handling CDC files from database replication
  • Deduplicating records based on primary keys
  • Separating inserts/updates from deletes for merge operations
  • Processing batch delta files from multiple sources

Key Methods:

set_primary_key_mapping(table_name, primary_keys)

Purpose: Configure primary key mappings for table-specific deduplication.

Parameters:

  • table_name: Name of the table
  • primary_keys: List of column names that form the primary key

Example:

delta_processor = transformer.get_utility('delta_processor')
delta_processor.set_primary_key_mapping('CUSTOMER', ['customer_id'])
delta_processor.set_primary_key_mapping('ORDER', ['order_id', 'line_item_id'])
process_table_batch(s3_bucket, prefix, table_name, rc_list, add_rccode)

Purpose: Process complete table batch with automatic deduplication and operation separation.

Parameters:

  • s3_bucket: S3 bucket name containing delta files
  • prefix: S3 prefix path (e.g., 'bronze', 'raw/cdc')
  • table_name: Name of the table being processed
  • rc_list: List of record codes to process (e.g., ['RC1', 'RC2'])
  • add_rccode: Boolean to add record code column to output

Returns: Tuple of (upserts DataFrame, deletes DataFrame, statistics dictionary)

Example:

delta_processor = transformer.get_utility('delta_processor')
delta_processor.set_primary_key_mapping('CUSTOMER', ['customer_id'])
df_upserts, df_deletes, stats = delta_processor.process_table_batch(
s3_bucket='my-data-bucket',
prefix='bronze',
table_name='CUSTOMER',
rc_list=['RC1', 'RC2'],
add_rccode=True
)
# stats contains: {'upserts': count, 'deletes': count, 'total': count}
read_delta_files(s3_bucket, prefix, table_name, rc_list)

Purpose: Read and combine delta files from S3 into a single DataFrame.

Parameters:

  • s3_bucket: S3 bucket name
  • prefix: S3 prefix path
  • table_name: Name of the table
  • rc_list: List of record codes to read

Returns: DataFrame with all delta records

Example:

df_raw = delta_processor.read_delta_files(
'my-bucket',
'bronze',
'CUSTOMER',
['RC1', 'RC2']
)
deduplicate_records(df, table_name)

Purpose: Deduplicate records based on configured primary key mappings.

Parameters:

  • df: Input DataFrame with potential duplicates
  • table_name: Table name for primary key lookup

Returns: Deduplicated DataFrame

Example:

df_deduped = delta_processor.deduplicate_records(df_raw, 'CUSTOMER')
separate_operations(df)

Purpose: Separate insert/update operations from delete operations based on operation codes.

Parameters:

  • df: Input DataFrame with operation codes

Returns: Tuple of (upserts DataFrame, deletes DataFrame)

Example:

df_upserts, df_deletes = delta_processor.separate_operations(df_deduped)

SCD Type 2 Processor

Purpose: Implement Slowly Changing Dimension Type 2 with effective dating and historical record preservation.

Access: scd_processor = transformer.get_utility('scd_processor')

Use Cases:

  • Maintaining historical data changes in dimension tables
  • Tracking customer or product attribute changes over time
  • Implementing effective date ranges for temporal queries
  • Preserving audit trails for regulatory compliance
  • Building data warehouse dimension tables with versioning

Key Methods:

complete_scd_processing(df, business_key_columns, date_columns, time_columns, create_date_col, create_time_col, update_date_col, update_time_col)

Purpose: Complete end-to-end SCD Type 2 processing with all transformations.

Parameters:

  • df: Source DataFrame with dimension data
  • business_key_columns: List of columns forming the business key
  • date_columns: List of date column names to format
  • time_columns: List of time column names to format
  • create_date_col: Name of creation date column
  • create_time_col: Name of creation time column
  • update_date_col: Name of update date column
  • update_time_col: Name of update time column

Returns: DataFrame with SCD Type 2 columns (effective_date, end_date, is_current, etc.)

Example:

scd_processor = transformer.get_utility('scd_processor')
df_scd = scd_processor.complete_scd_processing(
df=source_df,
business_key_columns=['customer_id'],
date_columns=['create_date', 'update_date'],
time_columns=['create_time', 'update_time'],
create_date_col='create_date',
create_time_col='create_time',
update_date_col='update_date',
update_time_col='update_time'
)
format_date_columns(df, columns, format)

Purpose: Format date columns to standard format for SCD processing.

Parameters:

  • df: Input DataFrame
  • columns: List of date column names
  • format: Source date format (e.g., 'yyyyMMdd', 'yyyy-MM-dd')

Returns: DataFrame with formatted date columns

Example:

df_with_dates = scd_processor.format_date_columns(
df,
['order_date'],
'yyyyMMdd'
)
format_time_columns(df, columns)

Purpose: Format time columns to standard format for SCD processing.

Parameters:

  • df: Input DataFrame
  • columns: List of time column names

Returns: DataFrame with formatted time columns

Example:

df_with_times = scd_processor.format_time_columns(df, ['order_time'])
calculate_effective_date(df, create_date_col, update_date_col)

Purpose: Calculate effective date for SCD Type 2 records.

Parameters:

  • df: Input DataFrame
  • create_date_col: Name of creation date column
  • update_date_col: Name of update date column

Returns: DataFrame with effective_date column

Example:

df_with_effective = scd_processor.calculate_effective_date(
df,
create_date_col='create_date',
update_date_col='update_date'
)
add_scd_ranking(df, business_keys)

Purpose: Add ranking for SCD logic to determine current vs historical records.

Parameters:

  • df: Input DataFrame
  • business_keys: List of business key column names

Returns: DataFrame with ranking columns

Example:

df_with_ranking = scd_processor.add_scd_ranking(df, ['customer_id'])
apply_scd_type2_logic(df, business_keys)

Purpose: Apply SCD Type 2 logic to determine record versions and effective dates.

Parameters:

  • df: Input DataFrame
  • business_keys: List of business key column names

Returns: DataFrame with SCD Type 2 columns applied

Example:

df_final = scd_processor.apply_scd_type2_logic(df, ['customer_id'])
get_current_records_only(df)

Purpose: Filter DataFrame to return only current (active) SCD Type 2 records.

Parameters:

  • df: SCD Type 2 DataFrame

Returns: DataFrame with only current records (is_current = True)

Example:

current_records = scd_processor.get_current_records_only(df_final)

Database Connections

Database Connector

Purpose: Manage database connections (AS/400, MariaDB, Generic JDBC) with connection pooling and query execution.

Access: db_connector = transformer.get_utility('database_connector')

Use Cases:

  • Connecting to legacy AS/400 systems
  • Reading data from MariaDB/MySQL databases
  • Querying any JDBC-compatible database
  • Bulk data extraction from source systems
  • Schema discovery and metadata extraction

Key Methods:

configure_as400_connection(name, host, library, user, password)

Purpose: Configure and register an AS/400 (DB2) database connection.

Parameters:

  • name: Connection name identifier
  • host: AS/400 server hostname or IP
  • library: AS/400 library name
  • user: Database username
  • password: Database password

Example:

db_connector = transformer.get_utility('database_connector')
db_connector.configure_as400_connection(
name='prod_as400',
host='as400.company.com',
library='PRODLIB',
user='username',
password='password'
)
configure_mariadb_connection(name, host, port, database, user, password)

Purpose: Configure and register a MariaDB/MySQL database connection.

Parameters:

  • name: Connection name identifier
  • host: MariaDB server hostname or IP
  • port: Database port (default: 3306)
  • database: Database name
  • user: Database username
  • password: Database password

Example:

db_connector.configure_mariadb_connection(
name='prod_mariadb',
host='mariadb.company.com',
port=3306,
database='production',
user='username',
password='password'
)
test_connection(name)

Purpose: Test database connection to verify connectivity and credentials.

Parameters:

  • name: Connection name to test

Returns: Boolean indicating connection success

Example:

is_connected = db_connector.test_connection('prod_as400')
execute_query(connection_name, query)

Purpose: Execute SQL query and return results as DataFrame.

Parameters:

  • connection_name: Name of configured connection
  • query: SQL query string

Returns: DataFrame with query results

Example:

df_result = db_connector.execute_query('prod_as400', 
"SELECT * FROM CUSTOMER WHERE STATUS = 'ACTIVE'")
read_table(connection_name, table_name, columns, where_clause)

Purpose: Read entire table or filtered subset into DataFrame.

Parameters:

  • connection_name: Name of configured connection
  • table_name: Name of table to read
  • columns: List of column names to select (optional, None for all)
  • where_clause: SQL WHERE clause for filtering (optional)

Returns: DataFrame with table data

Example:

df_customers = db_connector.read_table(
connection_name='prod_mariadb',
table_name='customers',
columns=['customer_id', 'name', 'email'],
where_clause="status = 'ACTIVE'"
)
get_table_schema(connection_name, table_name)

Purpose: Get table schema information including columns and data types.

Parameters:

  • connection_name: Name of configured connection
  • table_name: Name of table

Returns: DataFrame with schema information

Example:

schema_df = db_connector.get_table_schema('prod_as400', 'CUSTOMER')

Snowflake Connector

Purpose: Manage Snowflake-specific connections and operations with optimized query execution and data loading.

Access: sf_connector = transformer.get_utility('snowflake_connector')

Use Cases:

  • Loading data into Snowflake data warehouse
  • Executing analytical queries in Snowflake
  • Managing Snowflake table DDL operations
  • Bulk data loading with staging tables
  • Multi-schema operations across Snowflake databases

Key Methods:

configure_connection(name, sf_url, sf_user, sf_password, sf_database, sf_warehouse, sf_role, sf_schema)

Purpose: Configure and register a Snowflake connection with full account details.

Parameters:

  • name: Connection name identifier
  • sf_url: Snowflake account URL
  • sf_user: Snowflake username
  • sf_password: Snowflake password
  • sf_database: Snowflake database name
  • sf_warehouse: Snowflake warehouse name
  • sf_role: Snowflake role name
  • sf_schema: Default schema name

Example:

sf_connector = transformer.get_utility('snowflake_connector')
sf_connector.configure_connection(
name='prod_snowflake',
sf_url='https://account.snowflakecomputing.com',
sf_user='username',
sf_password='password',
sf_database='PROD_DB',
sf_warehouse='COMPUTE_WH',
sf_role='DATA_ENGINEER',
sf_schema='BRONZE'
)
test_connection(name, method)

Purpose: Test Snowflake connection using specified method (JDBC or native).

Parameters:

  • name: Connection name to test
  • method: Connection method ('jdbc' or 'native', default: 'jdbc')

Returns: Boolean indicating connection success

Example:

is_connected = sf_connector.test_connection('prod_snowflake', method='jdbc')
execute_query(connection_name, query)

Purpose: Execute SQL query in Snowflake and return results as DataFrame.

Parameters:

  • connection_name: Name of configured connection
  • query: Snowflake SQL query string

Returns: DataFrame with query results

Example:

df_result = sf_connector.execute_query('prod_snowflake', 
"SELECT * FROM BRONZE.CUSTOMERS WHERE STATUS = 'ACTIVE'")
write_dataframe(df, connection_name, table_name, schema_name, mode)

Purpose: Write DataFrame to Snowflake table with optimized bulk loading.

Parameters:

  • df: DataFrame to write
  • connection_name: Name of configured connection
  • table_name: Target table name
  • schema_name: Snowflake schema name
  • mode: Write mode ('append', 'overwrite', 'merge')

Returns: Boolean indicating success

Example:

success = sf_connector.write_dataframe(
df=customers_df,
connection_name='prod_snowflake',
table_name='CUSTOMERS_STAGING',
schema_name='BRONZE',
mode='overwrite'
)
execute_ddl(connection_name, ddl_statement)

Purpose: Execute DDL statements in Snowflake (CREATE, ALTER, DROP, etc.).

Parameters:

  • connection_name: Name of configured connection
  • ddl_statement: DDL SQL statement

Returns: Boolean indicating success

Example:

success = sf_connector.execute_ddl('prod_snowflake', 
"CREATE OR REPLACE TABLE BRONZE.TEMP_TABLE AS SELECT * FROM BRONZE.CUSTOMERS")

Data Transformations

Data Formatter

Purpose: Advanced data formatting operations (dates, times, numerics, strings) with data quality rule enforcement.

Access: data_formatter = transformer.get_utility('data_formatter')

Use Cases:

  • Standardizing date/time formats across sources
  • Enforcing data quality rules and constraints
  • Formatting numeric values with precision control
  • Handling NULL values with default replacements
  • Data cleansing and normalization

Key Methods:

apply_data_quality_rules(df, quality_rules)

Purpose: Apply data quality rules to DataFrame columns with validation and default value handling.

Parameters:

  • df: Input DataFrame
  • quality_rules: Dictionary mapping column names to quality rules:
    • not_null: Boolean to enforce non-null values
    • default_value: Default value for NULLs
    • min_value: Minimum value for numeric columns
    • max_value: Maximum value for numeric columns
    • pattern: Regex pattern for string validation

Returns: DataFrame with quality rules applied

Example:

data_formatter = transformer.get_utility('data_formatter')
df_formatted = data_formatter.apply_data_quality_rules(df_upserts, {
'customer_id': {'not_null': True, 'default_value': 'UNKNOWN'},
'amount': {'not_null': True, 'default_value': 0.0, 'min_value': 0},
'email': {'pattern': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'}
})
format_date_columns(df, columns, format)

Purpose: Format date columns to standard format with pattern matching.

Parameters:

  • df: Input DataFrame
  • columns: List of date column names
  • format: Source date format pattern (e.g., 'yyyyMMdd', 'MM/dd/yyyy', 'yyyy-MM-dd')

Returns: DataFrame with formatted date columns

Example:

df_formatted = data_formatter.format_date_columns(
df,
['order_date', 'ship_date'],
'yyyyMMdd'
)
format_time_columns(df, columns)

Purpose: Format time columns to standard time format.

Parameters:

  • df: Input DataFrame
  • columns: List of time column names

Returns: DataFrame with formatted time columns

Example:

df_formatted = data_formatter.format_time_columns(df, ['order_time', 'process_time'])
format_numeric_columns(df, columns, precision, scale)

Purpose: Format numeric columns with specified precision and scale.

Parameters:

  • df: Input DataFrame
  • columns: List of numeric column names
  • precision: Total number of digits
  • scale: Number of decimal places

Returns: DataFrame with formatted numeric columns

Example:

df_formatted = data_formatter.format_numeric_columns(
df,
['amount', 'price'],
precision=10,
scale=2
)

Column Sanitizer

Purpose: Column name sanitization and cleaning for target systems compatibility.

Access: column_sanitizer = transformer.get_utility('column_sanitizer')

Use Cases:

  • Preparing column names for Iceberg/Snowflake ingestion
  • Removing special characters incompatible with SQL
  • Standardizing naming conventions across systems
  • Handling reserved keywords in target databases

Note: Column sanitization is also available via the core clean_column_names() method which provides similar functionality.

Data Writers

Iceberg Writer

Purpose: Write data to Iceberg tables with CDC support, merge operations, and partition management.

Access: iceberg_writer = transformer.get_utility('iceberg_writer')

Use Cases:

  • Loading data into data lake Bronze/Silver/Gold layers
  • Performing CDC merge operations with upserts and deletes
  • Incremental data loading with partition management
  • Schema evolution and table maintenance
  • Optimized writes for large datasets

Key Methods:

merge_cdc_data(df_upserts, df_deletes, table_name, database, primary_key_columns)

Purpose: Merge CDC data into Iceberg table handling both inserts/updates and deletes.

Parameters:

  • df_upserts: DataFrame with insert/update records
  • df_deletes: DataFrame with delete records
  • table_name: Target Iceberg table name
  • database: Database name in Glue catalog
  • primary_key_columns: List of primary key column names

Returns: Dictionary with merge statistics (rows_written, rows_deleted, etc.)

Example:

iceberg_writer = transformer.get_utility('iceberg_writer')
iceberg_stats = iceberg_writer.merge_cdc_data(
df_upserts=df_quality,
df_deletes=df_deletes,
table_name='customer',
database='gold',
primary_key_columns=['customer_id']
)
append_data(df, table_name, database)

Purpose: Append data to existing Iceberg table.

Parameters:

  • df: DataFrame to append
  • table_name: Target Iceberg table name
  • database: Database name in Glue catalog

Returns: Boolean indicating success

Example:

success = iceberg_writer.append_data(
df=df_new_records,
table_name='customer',
database='gold'
)
overwrite_data(df, table_name, database)

Purpose: Overwrite data in Iceberg table (replaces all existing data).

Parameters:

  • df: DataFrame to write
  • table_name: Target Iceberg table name
  • database: Database name in Glue catalog

Returns: Boolean indicating success

Example:

success = iceberg_writer.overwrite_data(
df=df_full_refresh,
table_name='customer',
database='gold'
)

Snowflake Writer

Purpose: Write data to Snowflake tables with optimized bulk loading and merge operations.

Access: snowflake_writer = transformer.get_utility('snowflake_writer')

Use Cases:

  • Loading data into Snowflake data warehouse
  • Incremental data updates with merge operations
  • Bulk data loading with staging tables
  • Multi-schema data distribution
  • High-performance data ingestion

Key Methods:

write_dataframe(df, table_name, schema_name, connection_config, mode)

Purpose: Write DataFrame to Snowflake table with optimized bulk loading.

Parameters:

  • df: DataFrame to write
  • table_name: Target Snowflake table name
  • schema_name: Snowflake schema name
  • connection_config: Snowflake connection configuration dictionary
  • mode: Write mode ('append', 'overwrite', 'error')

Returns: Boolean indicating success

Example:

snowflake_writer = transformer.get_utility('snowflake_writer')
sf_success = snowflake_writer.write_dataframe(
df=df_quality,
table_name='CUSTOMER',
schema_name='GOLD',
connection_config=snowflake_config,
mode='append'
)
merge_data(df, table_name, schema_name, connection_config, merge_keys)

Purpose: Merge data into Snowflake table using specified merge keys.

Parameters:

  • df: DataFrame to merge
  • table_name: Target Snowflake table name
  • schema_name: Snowflake schema name
  • connection_config: Snowflake connection configuration dictionary
  • merge_keys: List of column names for merge matching

Returns: Boolean indicating success

Example:

sf_success = snowflake_writer.merge_data(
df=df_updates,
table_name='CUSTOMER',
schema_name='GOLD',
connection_config=snowflake_config,
merge_keys=['customer_id']
)

Configuration & Monitoring

Configuration Manager

Purpose: Centralized configuration management with validation, type conversion, and multiple source support.

Access: config = transformer.get_utility('config_manager')

Use Cases:

  • Loading configuration from AWS Glue job parameters
  • Managing environment-specific settings
  • Validating configuration before job execution
  • Type-safe parameter retrieval with defaults
  • Centralized connection configuration management

Key Methods:

load_glue_job_args(required_args, optional_args)

Purpose: Load configuration parameters from AWS Glue job arguments.

Parameters:

  • required_args: List of required parameter names
  • optional_args: List of optional parameter names

Example:

config = transformer.get_utility('config_manager')
config.load_glue_job_args(
required_args=["SF_URL", "SF_USER", "SF_PASSWORD"],
optional_args=["S3_BUCKET", "TABLE_LIST", "DEBUG_MODE"]
)
load_environment_variables(mapping)

Purpose: Load configuration from environment variables with custom naming.

Parameters:

  • mapping: Dictionary mapping internal names to environment variable names

Example:

config.load_environment_variables({
'db_host': 'DB_HOST',
'db_port': 'DB_PORT',
'api_key': 'API_KEY'
})
load_json_config(path)

Purpose: Load configuration from JSON file.

Parameters:

  • path: Path to JSON configuration file (local or S3)

Example:

config.load_json_config('s3://my-bucket/config/production.json')
get_param(name, default, type)

Purpose: Get parameter value with type conversion and default fallback.

Parameters:

  • name: Parameter name
  • default: Default value if parameter not found
  • type: Type to convert to (int, str, bool, list, dict, etc.)

Returns: Typed parameter value

Example:

batch_size = config.get_param('BATCH_SIZE', 1000, int)
debug_mode = config.get_param('DEBUG_MODE', False, bool)
table_list = config.get_param('TABLE_LIST', [], list)
get_connection_config(connection_type)

Purpose: Get connection configuration for specified connection type.

Parameters:

  • connection_type: Connection type ('snowflake', 'as400', 'mariadb', etc.)

Returns: Dictionary with connection configuration

Example:

snowflake_config = config.get_connection_config("snowflake")
# Returns: {'sf_url': ..., 'sf_user': ..., etc.}
get_s3_config()

Purpose: Get S3 configuration (bucket, prefix, etc.).

Returns: Dictionary with S3 configuration

Example:

s3_config = config.get_s3_config()
# Returns: {'s3_bucket': ..., 's3_prefix': ..., etc.}
validate_all(type_mapping)

Purpose: Validate all configuration parameters against expected types.

Parameters:

  • type_mapping: Dictionary mapping parameter names to expected types

Returns: Tuple of (is_valid: bool, errors: list)

Example:

is_valid, errors = config.validate_all({
'BATCH_SIZE': int,
'DEBUG_MODE': bool,
'TABLE_LIST': list
})
if not is_valid:
raise ValueError(f"Configuration errors: {errors}")

Performance Monitor

Purpose: Comprehensive job performance tracking with memory monitoring, operation timing, and detailed reporting.

Access: monitor = transformer.get_utility('performance_monitor') or monitor = transformer.get_performance_monitor()

Use Cases:

  • Tracking ETL job execution time
  • Identifying performance bottlenecks
  • Monitoring memory usage during processing
  • Tracking DataFrame operation statistics
  • Generating performance reports for optimization

Key Methods:

time_operation(name)

Purpose: Context manager for timing operations with automatic start/stop.

Parameters:

  • name: Operation name identifier

Example:

monitor = transformer.get_performance_monitor()
with monitor.time_operation("data_processing"):
df_result = transformer.execute_sql_block(complex_query, dataframes)
start_timer(name) / stop_timer(name)

Purpose: Manually start and stop timers for operations.

Parameters:

  • name: Operation name identifier

Returns: Duration in seconds (for stop_timer)

Example:

monitor.start_timer('data_extraction')
# ... perform operation ...
duration = monitor.stop_timer('data_extraction')
track_dataframe_operation(operation_name, df_before, df_after, operation_type)

Purpose: Track DataFrame operations with before/after statistics.

Parameters:

  • operation_name: Name of the operation
  • df_before: DataFrame before operation
  • df_after: DataFrame after operation
  • operation_type: Type of operation (e.g., 'JOIN', 'FILTER', 'TRANSFORM')

Example:

monitor.track_dataframe_operation(
"customer_join",
df_before=orders_df,
df_after=joined_df,
operation_type="JOIN"
)
record_data_stats(name, stats)

Purpose: Record custom metrics and statistics.

Parameters:

  • name: Metric name identifier
  • stats: Dictionary with metric values

Example:

monitor.record_data_stats("final_output", {
"rows_processed": len(df_result),
"tables_updated": 3,
"error_count": 0,
"processing_time_minutes": 15.5
})
generate_summary_report()

Purpose: Generate comprehensive performance summary report.

Returns: Dictionary with summary statistics

Example:

summary = monitor.generate_summary_report()
# Returns: {'total_time': ..., 'operations': ..., 'memory_usage': ..., etc.}
get_operation_breakdown()

Purpose: Get detailed breakdown of all timed operations.

Returns: Dictionary with operation timings

Example:

breakdown = monitor.get_operation_breakdown()
# Returns: {'operation1': duration1, 'operation2': duration2, ...}
get_slowest_operations(top_n)

Purpose: Get list of slowest operations for bottleneck identification.

Parameters:

  • top_n: Number of slowest operations to return

Returns: List of (operation_name, duration) tuples

Example:

slowest_ops = monitor.get_slowest_operations(top_n=5)
# Returns: [('complex_join', 120.5), ('data_validation', 45.2), ...]
export_metrics(path)

Purpose: Export all metrics to JSON file.

Parameters:

  • path: File path for metrics export

Returns: Path to exported file

Example:

export_path = monitor.export_metrics('/tmp/job_metrics.json')
finalize_job()

Purpose: Finalize job monitoring and generate comprehensive final report.

Returns: Dictionary with complete job statistics and report

Example:

final_report = monitor.finalize_job()
# Returns comprehensive report with all metrics, timings, and statistics

Complete Usage Patterns

1. Full ETL Pipeline with CDC

from etlToolkit import GlueTransformationsUtil

# Initialize with comprehensive utilities
transformer = GlueTransformationsUtil(spark_session)

# Configuration management
config = transformer.get_utility('config_manager')
config.load_glue_job_args(['S3_BUCKET', 'TABLE_LIST', 'SF_URL', 'SF_USER', 'SF_PASSWORD'])

# Performance monitoring
monitor = transformer.get_performance_monitor()

with monitor.time_operation('full_etl_pipeline'):
# Get configurations
s3_config = config.get_s3_config()
snowflake_config = config.get_connection_config('snowflake')
table_list = config.get_param('TABLE_LIST', [], list)

# Process each table
for table_name in table_list:
with monitor.time_operation(f'process_table_{table_name}'):
# Delta processing
delta_processor = transformer.get_utility('delta_processor')
df_upserts, df_deletes, stats = delta_processor.process_table_batch(
s3_bucket=s3_config['s3_bucket'],
prefix='bronze',
table_name=table_name,
rc_list=['RC1', 'RC2']
)

# Data transformations
df_clean = transformer.clean_column_names(df_upserts)
df_typed = transformer.change_column_types(df_clean, {
'customer_id': 'string',
'order_date': 'date',
'amount': 'double'
})

# Data quality
data_formatter = transformer.get_utility('data_formatter')
df_quality = data_formatter.apply_data_quality_rules(df_typed, {
'customer_id': {'not_null': True, 'default_value': 'UNKNOWN'},
'amount': {'not_null': True, 'default_value': 0.0}
})

# Write to Iceberg
iceberg_writer = transformer.get_utility('iceberg_writer')
iceberg_stats = iceberg_writer.merge_cdc_data(
df_upserts=df_quality,
df_deletes=df_deletes,
table_name=table_name,
database='gold',
primary_key_columns=['customer_id']
)

# Write to Snowflake
snowflake_writer = transformer.get_utility('snowflake_writer')
sf_success = snowflake_writer.write_dataframe(
df=df_quality,
table_name=table_name.upper(),
schema_name='GOLD',
connection_config=snowflake_config,
mode='append'
)

# Record statistics
monitor.record_data_stats(f'{table_name}_processing', {
'upserts': stats['upserts'],
'deletes': stats['deletes'],
'iceberg_success': bool(iceberg_stats),
'snowflake_success': sf_success
})

# Generate final report
final_report = monitor.finalize_job()

2. Pandas Data Processing

from etlToolkit import GlueTransformationsUtil

# Works without Spark session
transformer = GlueTransformationsUtil()

# Check capabilities
if not transformer.is_spark_available():
print("Using Pandas-only mode")

# Read data
df = pd.read_csv('input.csv')

# All core transformations work with Pandas
df_typed = transformer.change_column_types(df, {'date_col': 'date'})
df_clean = transformer.clean_column_names(df_typed)

# Generate unique identifiers
df_with_id = transformer.generate_uid_column(df_clean, 'record_uuid', 'uuid')

# Join operations
lookup_df = pd.read_csv('lookup.csv')
df_final = transformer.perform_join(df_with_id, lookup_df, 'key', join_type='left')

# Union multiple DataFrames
combined_df = transformer.perform_union([df_final, historical_df], union_type='union_distinct')

Supported Data Types & Operations

Data Types

  • string - Text/varchar data
  • integer - 32-bit integers
  • long - 64-bit integers
  • double - Double precision floating point
  • boolean - True/false values
  • date - Date only (YYYY-MM-DD)
  • timestamp - Date and time with timezone
  • decimal - Fixed precision decimal numbers

Join Types

  • inner - Returns only matching records from both DataFrames
  • left - Returns all records from left DataFrame, matched from right
  • right - Returns all records from right DataFrame, matched from left
  • outer - Returns all records when there's a match in either DataFrame

Union Types

  • union - Standard union (may remove duplicates in Spark)
  • union_all - Union all rows (preserves duplicates)
  • union_distinct - Union with explicit duplicate removal

UID Types

  • uuid - UUID4 format (e.g., '550e8400-e29b-41d4-a716-446655440000')
  • monotonic - Monotonically increasing integers
  • timestamp_based - Timestamp combined with sequence number

Compatibility & Requirements

AWS Glue Version Support

  • AWS Glue 3.0: Fully supported (Python 3.7+)
  • AWS Glue 4.0: Fully supported (Python 3.8+) - Recommended
  • AWS Glue 5.0: Fully supported (Python 3.9+)

Dependencies

  • Required: Python 3.7+, pandas
  • Optional: pyspark, awsglue, snowflake-connector-python, requests, psutil

Best Practices

  1. Initialize once and reuse the transformer instance
  2. Use performance monitoring for production jobs
  3. Validate configurations before processing
  4. Handle errors gracefully with proper logging
  5. Take advantage of specialized utilities for complex operations
  6. Use appropriate data types for optimal performance
  7. Monitor resource usage in distributed environments
  8. Follow camelCase naming conventions for consistency

How to use

  1. Attach the library to the required job.
  2. In the job script, initiate the library as follows:
# Import
from etlToolkit import GlueTransformationsUtil

# Initialize with Spark session - enables all utilities
# spark_session should be initialized to load the corresponding tools and utilities
transformer = GlueTransformationsUtil(spark_session)
# Or Initialize for Python shell jobs
transformer_pythonboot = GlueTransformationsUtil()

tip

User can also use python's helper module in the following way in the script to get a comprehensive user guide.

help(etlToolkit)