Streams Datasource
You can import real-time data from external sources and stream it to Amorphic with Stream Datasource
, Amorphic uses Kinesis Stream to collect and process large streams of data in real-time. Further, you can also use it for ETL/Analytics.
To utilize the stream of data you need to create Dataflow
that will deliver the data to datasets. A stream can provide one-to-many connections so you can share that data with multiple dataflows. You can also perform Data transformation
before you stream the data to datasets.
Kinesis Streams
- To use the data stream, create "Dataflow" to transfer data into datasets. A stream can connect to multiple dataflows and data can be transformed before streaming.
- To create a Kinesis data stream in Amorphic, use the '+' icon in the Datasource section. The table below shows the required fields for stream datasource creation:
Attribute | Description |
---|---|
Datasource Name | Name for the datasource. |
Datasource Type | Only Kinesis is supported. |
Stream Mode (Optional) | Type of stream, e.g. On-Demand or Provisioned. Default is Provisioned. |
Data Retention Period (Hrs) | Time period for which the data stream retains data records. |
Shard Count | Number of shards for the stream. Each shard ingests up to 1 MiB/second and 1000 records/second. |
- After creating a stream datasource, Amorphic provides AWS access keys to push data to the stream. Refer to the AWS documentation for different ways to push data.
- To update a stream datasource, use the edit option on the details page. Only metadata and configuration can be updated.
- To retrieve all datasets associated with a stream, use the following API call:
/datasources/{datasource_id}?request_type=get_dependent_resources
& GET
method
- Dataflows use Kinesis Data Firehose delivery streams to continuously collect and load data into specified destinations. Each dataflow is attached to a dataset as the final destination of collected data.
Dataflow
To create a dataflow in Amorphic, you need to use the service Kinesis Data Firehose. The Kinesis Data Firehose service allows you to continuously collect and load streaming data into specified destinations. Each dataflow is attached to a dataset, which serves as the final destination of data collected from the stream.
To create a dataflow in Amorphic, you will need to provide the following information:
Attribute | Description |
---|---|
Dataflow Name | Name to be used for the dataflow. |
Buffer Size | Buffer incoming data to the specified size, in MiBs, before delivering it to the destination |
Buffer Interval | A period of time during which the data stream retains data records (In hours). |
Target Location | Final destination for the streaming data is to be stored. |
Create Dataset | Option to create a new dataset or use an existing dataset. |
Dataset Configuration | Refer (Create Dataset) in Datasets |
The Dataset FileType supported for dataflow target locations are:
- Redshift: CSV, PARQUET
- Lakeformation: CSV, PARQUET.
- S3 Athena: CSV, PARQUET.
- S3: CSV, OTHERS.
It is recommended to refer the AWS documentation for more detailed information on how to create a dataflow using Kinesis Data Firehose and how to configure it.
Data Transformations
You can use Entities to customize how data is processed as it streams into Amorphic. These functions are defined on per-stream basis and can be applied to any dataflow of that stream datasource.
The data is temporarily stored in a buffer before the lambda function (user-defined processing) is applied. The buffer size can be configured per dataflow, regardless of the dataflow's individual buffer settings. For more information, refer to the AWS documentation.
Attribute | Description |
---|---|
Entity Name | Display Name for the user defined entity. |
Lambda Handler | The unbridged lambda handler for the user defined function. |
Memory Size (Optional) | The memory to be allocated for the user defined function. Default: 128 MB. |
The number of invocations of the lambda function is directly proportional to the number of shards in the data stream and the number of dataflows attached to the entity.
Therefore, Amorphic limits this functionality to a maximum of 20 overall entities and 5 dataflows per entity.
Stream Datasource use case
Let's see a use case where you can feed in the streaming data & consume the data for analytical purposes.
To start, access the Stream Stream-Datasource through the access key & secret access key given in the DatasourceConfig details.
Here, data is sent to the stream using a python boto3 script.
import json
import csv
import boto3
data_records = []
client = boto3.client('kinesis', region_name='eu-west-1', aws_access_key_id="ACCESS_KEY_ID", aws_secret_access_key='SECRET_ACCESS_KEY')
counter=0
with open('./data.csv', encoding='utf-8') as csvf:
lines=csvf.readlines()
for line in lines:
data_records.append(line)
response = client.put_record(
StreamName = "Stream-Datasource",
Data = line,
PartitionKey = str(hash(line))
)
counter = counter + 1
print('Message sent #' + str(counter))
#If the message was not successfully sent print an error message
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
print('Error!')
print(response)
print(data_records)
Amorphic Stream-Datasource then receives the data and passes it over to the dataflows attached to the stream, here the dataflows attached are - ● Technology-DataFlow ● Financial-DataFlow
Both dataflows attached to the Stream-Datasource receive the same records. However, if Entities are linked to these dataflows, they filter the respective records and allow only those specified by the custom code within the Entities.
Here, the Entities are -
● Technology-DataTransformation - which filters only the financial sector records ● Financial-DataTransformation - which filters only the technical sector records
After the records have been processed by the Entities of the respective dataflows, the datasets associated with those dataflows will contain only the records filtered by the respective Dataflow's Entity.
Here, the Dataflow Financial-DataFlow’s associated dataset Financial-Data will have only the Financial records & the dataflow Technology-DataFlow’s associated dataset Technology-Data will have the Technical records.
Thus, the resultant datasets can be used for further analysis of the amorphic application using several analytical services offered on the platform.
Data Transformation code for filtering technical records.
"""
File defines logic for transforming records.
"""
import base64
import json
import sys
OTHER_SECTORS = ["HEALTHCARE", "ENERGY", "RETAIL", "FINANCIAL"]
def lambda_handler(event, context):
"""
Entry point for lambda.update test.
"""
print("input size: ", sys.getsizeof(event["records"]))
output = []
for record in event["records"]:
payload = base64.b64decode(record["data"]).decode('utf-8')
record_info = payload.split(",")
if record_info[3].strip() == "TECHNOLOGY":
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(payload.encode('utf-8'))
}
elif record_info[3].strip() in OTHER_SECTORS:
output_record = {
"recordId": record["recordId"],
"result": "Dropped",
"data": base64.b64encode(payload.encode('utf-8'))
}
else:
output_record = {
"recordId": record["recordId"],
"result": "ProcessingFailed",
"data": base64.b64encode(payload.encode('utf-8'))
}
output.append(output_record)
print("Processed {} records.".format(len(event["records"])))
print("output size: ", sys.getsizeof(output))
return {"records": output}
Data Transformation code for filtering financial records.
"""
File defines logic for transforming records.
"""
import base64
import json
import sys
OTHER_SECTORS = ["HEALTHCARE", "ENERGY", "RETAIL", "TECHNOLOGY"]
def lambda_handler(event, context):
"""
Entry point for lambda.update test.
"""
print("input size: ", sys.getsizeof(event["records"]))
output = []
for record in event["records"]:
payload = base64.b64decode(record["data"]).decode('utf-8')
record_info = payload.split(",")
if record_info[3].strip() == "FINANCIAL":
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(payload.encode('utf-8'))
}
elif record_info[3].strip() in OTHER_SECTORS:
output_record = {
"recordId": record["recordId"],
"result": "Dropped",
"data": base64.b64encode(payload.encode('utf-8'))
}
else:
output_record = {
"recordId": record["recordId"],
"result": "ProcessingFailed",
"data": base64.b64encode(payload.encode('utf-8'))
}
output.append(output_record)
print("Processed {} records.".format(len(event["records"])))
print("output size: ", sys.getsizeof(output))
return {"records": output}