Skip to main content
Version: v3.0 print this page

Overview

Amorphic Data Pipelines enable you to orchestrate and visualize complex analytical processes by combining various components including ETL jobs, machine learning model inference tasks, notifications, and AWS AI services like Textract, Translate, Comprehend, and Medical Comprehend.

The Data Pipelines feature provides comprehensive management, execution, and monitoring capabilities for all pipeline components. It allows you to create dependency chains (Directed Acyclic Graphs) between different types of tasks, enabling sophisticated analytical workflows.

The Data Pipelines interface provides intuitive options to view existing pipelines, create new ones, and filter pipelines based on criteria such as name, creator, and creation time.

Data Pipelines Overview

Creating Data Pipelines

To create a pipeline node, import a pre-configured Amorphic module. The following attributes are required for node creation:

AttributeDescription
Module TypeA pre-configured building block that supports various task types including ETL Jobs, ML model inference, notifications, AWS AI services, S3 sync, and File Load Validation.
ResourceAvailable resources based on the selected module type. For example, selecting ETL Job module type displays all accessible ETL jobs.
Node NameA unique identifier for easy reference and management.
Input ConfigurationsParameters that can be utilized within the job execution.

Pipeline Execution Properties

Pipeline execution properties are key-value pairs that can be defined during pipeline creation or modification. These properties are accessible during execution and can be used to dynamically adjust pipeline behavior.

Pipeline execution properties

Retrieving pipeline execution properties:

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext

glue_client = boto3.client("glue")
args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
workflow_params = glue_client.get_workflow_run_properties(Name=workflow_name,
RunId=workflow_run_id)["RunProperties"]

email_to = workflow_params['email_to']
email_body = workflow_params['email_body']
email_subject = workflow_params['email_subject']
file_name_ml_model_inference = workflow_params['file_name_ml_model_inference']

Modifying pipeline execution properties:

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext

glue_client = boto3.client("glue")
args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
workflow_params = glue_client.get_workflow_run_properties(Name=workflow_name,
RunId=workflow_run_id)["RunProperties"]

workflow_params['email_subject'] = 'Coupon: Grab and go!'
glue_client.put_workflow_run_properties(Name=workflow_name, RunId=workflow_run_id, RunProperties=workflow_params)

Pipeline Nodes

View list of pipeline nodes

In Amorphic, nodes are the building blocks of data pipelines, constructed using pre-defined modules. Each node represents a specific task type and can be configured, monitored, and managed independently. Nodes can be connected to form complex pipelines with defined dependencies.

Node TypeDescription
ETL JobExecutes data transformation jobs with configurable arguments
ML ModelRuns machine learning model inference tasks
EmailSends notifications with customizable recipient, subject, and body
TextractExtracts text from documents and images
RekognitionAnalyzes images and videos for object, people, and text detection
TranslatePerforms language translation
ComprehendExtracts insights from text content
Medical ComprehendProcesses medical text for specialized insights
TranscribeConverts audio to text
Medical TranscribeSpecialized audio-to-text conversion for medical content
PipelineEnables nested pipeline execution
Sync to S3Manages data synchronization with S3 storage
File Load ValidationPerforms data validation before ingestion

Data pipeline nodes

Pipeline Execution

Data pipelines can be executed immediately or scheduled for future execution. The scheduling feature allows for automated pipeline runs at specified intervals, with the flexibility to enable or disable schedules as needed.

To stop an active pipeline execution, use the Stop Execution option available in the pipeline's context menu (three dots).

Trigger Pipeline execution

Conditional Node Execution

Amorphic supports conditional node execution based on the success or failure of predecessor nodes. This powerful feature enables complex use cases such as error handling and branching logic within pipelines.

Example scenario:

Pipeline demonstrating conditional execution

In this example:

  • The "SendPromotionalEmails" ETL job executes only after successful completion of "ReadCustomerDetails"
  • If "ReadCustomerDetails" fails, the "FailureAlertGenerator" node sends notification emails

Pipeline execution with conditional triggers

Complex ETL example: Pipeline demonstrating complex ETL process

Execution rules:

  • node_one: Executes at pipeline start
  • node_two: Executes when node_one succeeds
  • node_three: Executes when node_two succeeds
  • node_four: Executes when node_six succeeds AND node_two succeeds AND node_three fails
  • node_five: Executes at pipeline start
  • node_six: Executes when node_five fails AND node_one succeeds

Existing pipelines maintain their functionality without requiring modifications. Users can edit pipeline configurations and update node triggers as needed.

Example Use Case

Consider a document processing pipeline with the following sequential tasks:

  1. Text extraction from images (Textract node)
  2. Text analysis and data extraction (Comprehend node)
  3. Completion notification (Email node)

The pipeline ensures sequential execution, where each node depends on the successful completion of its predecessor, maintaining data processing integrity.

Execution Logs

Amorphic provides comprehensive logging for ETL Job and Datasource nodes. Access logs through the execution details tab using the following options:

Preview Logs

User can preview logs for the specified time frames to analyze the execution status and details of the node execution. By default, the logs are displayed from the execution start time to the current time(if the execution is still running) or the end time of the execution.

Preview logs

Generate Logs

In case, there are a large number of logs, user can trigger logs generation for the specified time frames to analyze the execution status and details of the node execution. The logs will be generated in the background and the user can download the logs from the execution details tab once available.

Generate logs

Download Logs

User can download the logs from the execution details tab once available. If a user triggers logs generation for a different time frame, the existing logs will be replaced with the new logs.

Download logs

Note

Pipeline execution logs are retained for 90 days. Attempts to access older execution logs will result in an error.