Saas Datasource
Saas Datasource is a feature in Amorphic that allows users to ingest data from SaaS applications into Amorphic Datasets for further ETL/analytics. Amorphic uses AWS App Flow in the backend to transfer the data. App Flow supports both AWS managed connectors and custom connectors created by the user. To use Saas Datasource, there are three steps:
- Choose an inbuilt connector or user can create their own Custom Connector.
- Create a Datasource Flow using the selected connector.
- Define a Datasource flow on the App Flow and run the Datasource flow.
Amorphic currently supports Slack, Zendesk, Salesforce, Google Analytics, and ServiceNow as AWS managed connectors and also allows the use of Custom Connectors. The first section will cover all functions related to custom connectors, if user is using an AWS managed connector, user can skip to App Flows section.
Use Case: Ingest data from Slack
The use case is to ingest data from Slack for further downstream analysis. The Saas Datasource UI relies on uploading API payloads as JSON files.
Saas Datasource has two components: Datasource and Datasource Flows.
Saas Datasource are used to authenticate with third-party services like Slack and establish a connection with them. Once a Saas Datasource is created, any number of Datasource Flows can be defined using it.
A Datasource Flow defines the specific data points to be ingested from the third-party service, such as User, Timestamp, Text, Threads, Replies, and Reactions for Slack. A flow can have three types of triggers: on-demand, scheduled, or event-based (only for Salesforce).
To create a Saas Datasource, follow these steps:
- Go to
Datasources
- Click on
+Create Datasource
- Upload a JSON file containing the payload to create a Saas Datasource.
- The required parameters for each SaaS app will vary slightly. Refer to the specific SaaS App's documentation for more information. For Slack, the payload would be like shown below:
{
"DatasourceName": "slack_test_9",
"Description": "This is a test datasource to ingest data from slack",
"DatasourceType": "saas",
"IngestionType": "slack",
"DatasourceConfig": {
"clientId": "3567161620994.3567146437507",
"clientSecret": "80673273e291fc237f2a5dd41c64d07c",
"accessToken": "xoxp-3567161620994-3564254695269-3579807622033-c452d46e9e8fbac855852e7926e2b9cf",
"instanceUrl": "https://testorg-jvq2634.slack.com"
},
"Keywords": [
"test",
"slack",
"post"
],
"CostTags": []
}
Once the Saas Datasource is created, user can create a Datasource Flow by following these steps:
- Go to the
Flows
tab - Click on
+ Create Flow
- Upload a JSON file containing the payload to create a Flow.
- Define the TriggerType, the dataset user wants to write to, the file format, and specify the actual data points user wants to read from the SaaS App.
These data points are specified in the Tasks parameter, which is specific to that SaaS App. For example, in this case, the data points are ts, type, and text.
{
"DataflowName": "slack_flow_11",
"CreateDataset": false,
"DatasetDetails": {
"DatasetId": "bb2d571e-65b3-4f22-b18a-39b8dc7f2ce0"
},
"DataFormat": "csv",
"DataflowConfig": {
"TriggerType": "scheduled",
"TriggerProperties": {
"scheduleStartTime": "2022-12-25T01:45:00",
"scheduleEndTime": "2022-12-26T02:10:00",
"scheduleExpression": "rate(3days)",
"dataPullMode": "Complete"
},
"SourceConnectorProperties": {
"Object": "conversations/C03HCTC8B0Q"
},
"Tasks": [
{
"taskType": "Filter",
"sourceFields": [
"ts",
"type",
"text"
],
"taskProperties": {},
"connectorOperator": {
"Slack": "PROJECTION"
}
},
{
"taskType": "Map",
"sourceFields": [
"ts"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "DateTime",
"DESTINATION_DATA_TYPE": "DateTime"
},
"destinationField": "ts",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Map",
"sourceFields": [
"type"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "String",
"DESTINATION_DATA_TYPE": "String"
},
"destinationField": "type",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Map",
"sourceFields": [
"text"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "String",
"DESTINATION_DATA_TYPE": "String"
},
"destinationField": "text",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Validate",
"sourceFields": [
"ts"
],
"taskProperties": {
"VALIDATION_ACTION": "DropRecord"
},
"connectorOperator": {
"Slack": "VALIDATE_NON_NULL"
}
}
],
"tags": {},
"IncrementalPullConfig":"ts"
}
}
-
Click on Dataflow to go to the Datasource Flow Runs page and check the status.
-
Check the status of the Datasource flow execution.
Once the Datasource flow execution is complete, the data will be available in the dataset specified in step 4. This dataset can be used to perform further analysis, such as in an ETL job.
Custom Connector
Custom Connector allows ingestion of data from any 3rd party Saas application. This is enabled by the AppFlow Custom Connector SDK. For this purpose, the connector developer will need to implement 3 main interfaces from the SDK. These are:
Interface | Description |
---|---|
Configuration Handler | Defines all functionality related to authentication and configuration. |
Metadata Handler | Retrieves metadata and parses it into App Flow specific format. |
Record Handler | Defines all functionality related to record related CRUD operations. |
The connector developers therefore are essentially writing the wiring code to translate from vendor’s API to the generic AppFlow custom connector interface. The custom connector implementation code along with the SDK and any 3rd party dependencies needs to be bundled into a zip file. This zip file can be used to register a custom connector in Amorphic.
Below are the list of API's and their corresponding methods to be used to create, update and delete Custom Connectors in Amorphic:
API's List
-
/datasources-connectors
- GET -- Returns details about all custom & inbuilt connectors.
- POST -- Saves custom connector metadata & returns a signed URL for file upload.
-
/datasources-connectors/
{id}
- GET -- Returns details about a single custom connector.
- POST -- Start the asynchronous process for custom connector creation/updation.
- PUT -- Updates custom connector metadata & returns a signed URL for file upload.
- DELETE -- Delete a custom connector.
Please replace {id}
with the DatasourceConnectorId which is received from the backend response.
API Request Payload Details
-
To retrieve details about all custom/inbuilt connectors as well as to download the Custom Connector template
/datasources-connectors & GET method
Response depends on query parameters(if any).
Query parameter Response None Returns details about all inbuilt connectors. connector_type = custom Returns details about all custom connectors. download_template = yes Returns a download link for a Custom Connector sample template. -
To create a custom connector
This is a multi step process - save metadata, upload zip file and then run the asynchronous process for registering a custom connector.
/datasources-connectors & POST method
Request payload for saving custom connector metadata{
"DatasourceConnectorName": <String>, (must be unique across an AWS account)
"IngestionType":"customconnector",
"Description": <String>,
"LambdaHandler": <String>,
"Keywords": [String]
}- LambdaHandler: The fully qualified name of the lambda handler used in the Custom Connector code.For example, in the provided Sample Connector Template, it will be: custom_connector_example.handlers.lambda_handler.app_lambda_handler
NoteThe response will contain a signed URL for uploading the custom connector zip file. Make a PUT request using this link. Please make the next API request only after this file is uploaded.
/datasources-connectors/
{id}
& POST methodRequest payload for registering custom connector{
"ConnectorAction" : "create-connector"
} -
To update a custom connector
Similar to create, this is also a multi step process - update metadata, upload zip file and then run the asynchronous process for registering a custom connector.
/datasources-connectors/
{id}
& PUT methodRequest payload for saving custom connector metadata{
"Description": <String>,
"LambdaHandler": <String>,
"Keywords": <String>
}- LambdaHandler: The fully qualified name of the lambda handler used in the Custom Connector code.For example, in the provided Sample Connector Template, it will be: custom_connector_example.handlers.lambda_handler.app_lambda_handler
NoteThe response will contain a signed URL for uploading the custom connector zip file. Make a PUT request using this link. Please make the next API request only after this file is uploaded.
/datasources-connectors/
{id}
& POST methodRequest payload for registering custom connector{
"ConnectorAction" : "update-connector"
}NoteIf the update fails for any reason, then the connector will automatically rollback to the previous implementation.
Saas Datasources
Below are the list of API's and their corresponding methods to be used to create, update and delete App Flows and their corresponding data flows:
API's List
-
/datasources
- POST -- Create Saas Datasource
- GET -- Returns all Saas Datasources that a user has access to
-
/datasources/
{datasource_id}
- GET -- Returns details of a single Saas Datasource
- PUT -- Edit a single Saas Datasource
- DELETE -- Delete a single Saas Datasource
-
/datasources/
{datasource_id}
/dataflows- POST -- Create a datasource flow
- GET -- Returns all datasource flows associated with a Saas Datasource
-
/datasources/
{datasource_id}
/dataflows/{id}
- POST -- Start or Stop a datasource flow
- PUT -- Edit a datasource flow
- GET -- Returns details of a single datasource flow
- DELETE -- Delete a datasource flow
-
/datasources/
{datasource_id}
/dataflows/{id}
/executions- GET -- Returns executions history of a single datasource flow
-
/datasources/
{id}
?resourceid- GET -- Returns all authorized users and tags of a Saas Datasource
- POST -- Grant access to a user/tag
- DELETE -- Revoke access from a user/tag
Please replace variables enclosed in {}
with the corresponding values (Ex: DatasourceId or DataflowId which is received from the backend response)
API Request Payload Details
-
To create a Saas Datasource
/datasources & POST method
Request payload to create Saas Datasource{
"DatasourceName": <String>,
"Description": <String>,
"DatasourceConnectorName": <String>, (Applicable only for customconnector)
"DatasourceType": <String>, (slack or zendesk or salesforce or googleanalytics or servicenow or customconnector)
"IngestionType": <String> , (saas in case of Saas Datasources)
"DatasourceConfig": { # For Inbuilt Connectors
"clientId": <String>, (Applicable only for slack or zendesk or googleanalytics)
"clientSecret": <String>, (Applicable only for slack or zendesk or googleanalytics)
"instanceUrl": <String>, (Applicable only for slack or zendesk or salesforce or servicenow)
"accessToken": <String>, (Applicable only for slack or zendesk or salesforce or googleanalytics),
"refreshToken": <String>, (Applicable only for salesforce or googleanalytics),
"username": <String>, (Applicable only for servicenow),
"password": <String>, (Applicable only for servicenow)
},
"DatasourceConfig":{ # For Custom Connectors
"authenticationType": "OAUTH2"|"APIKEY"|"BASIC"|"CUSTOM", (required field, corresponding details needs to be provided)
"basic": { (optional field, required if authenticationType is "BASIC")
"username": <String>, (required field)
"password": <String> (required field)
},
"oauth2": { (optional field, required if authenticationType is "OAUTH2")
"clientId": <String>,
"clientSecret": <String>,
"accessToken": <String>,
"refreshToken": <String>,
},
"apiKey": { (optional field, required if authenticationType is "APIKEY")
"apiKey": <String>, (required field)
"apiSecretKey": <String>
},
"custom": { (optional field, required if authenticationType is "CUSTOM")
"customAuthenticationType": <String>, (required field)
"credentialsMap": {
"customProperty1": <String>,
"customProperty2": <String>
},
"profileProperties": { (optional runtime parameters)
"runtimeProperty1":<String>,
"runtimeProperty2":<String>
},
"oAuth2Properties":{ (optional field, required if authenticationType is "OAUTH2")
"tokenUrl":<String>, (required field)
"oAuth2GrantType":<String> (required field, 'CLIENT_CREDENTIALS' or 'AUTHORIZATION_CODE')
},
"Keywords": ["test", "new"]
}
}
The above payload can also be uploaded through UI as shown below:
The following picture depicts the Saas Datasource details page in Amorphic:
-
To create a datasource flow
/datasources/
{datasource_id}
/dataflows & POST methodRequest payload to create Saas datasource flow{
"DataflowName": <String>,
"DataflowConfig": {
"TriggerType": <String>, (scheduled or ondemand or event(applicable only for salesforce))
"TriggerProperties": {
"scheduleStartTime": <String>, (Timestamp in the format of YYYY-MM-DDThh:mm:ss)
"scheduleEndTime": <String>, (Timestamp in the format of YYYY-MM-DDThh:mm:ss)
"scheduleExpression": <String>, (cron and rate expressions are supported. Ex:- rate(2days), cron(0 12 * * ? *), etc)
"scheduleOffset":<String>, (Specifies the optional offset that is added to the time interval in the format of hh:mm:ss)
"dataPullMode": <String>, (Incremental or Complete)
},
"IncrementalPullConfig":<String>, (optional field for customconnector),
"SourceConnectorProperties": {
"Object": <String> (Object that needed to transfer the data from source),
"Tasks": <List of objects>,
"ApiVersion": <String>, (optional field for customconnector)
}
}
"CreateDataset": Boolean (true or false),
"DatasetDetails": {
"DatasetName" : <String>,
"Description": <String>,
"Keywords": <List of strings>,
"Domain": <String>,
"FileType": <String>,
"TargetLocation": <String>,
"DatasetId": <String>,
"SkipLZProcess": Boolean (true or false)
},
"DataFormat": <String>, (Only csv or parquet or others is supported)
}- TriggerProperties: This field is supported/applicable when the TriggerType is of scheduled
- ApiVersion: This is an optional field for custom connectors, which specifies the Saas Application's API version.
- IncrementalPullConfig: This is an optional field for custom connectors, which specifies which field should be used to identify records that have changed since the last flow run, for incremental flows.
- CreateDataset: This is a Boolean key with possible values of true or false.
- DatasetDetails: Fields in this object are determined by the key CreateDataset. When it is set to true then all the metadata related to dataset creation is required in the input body as stated in the above payload else only DatasetId is required.
- SourceConnectorProperties: This is where we need to specify what data from the source needs to be transferred to the target (Ex:- conversations/C0234C1JTUP).
- DataFormat: Format of the data in which the data needs to be transferred ("csv", "parquet" & "others" formats are supported)
- Tasks: List of objects, all the columns that are needed from the source and their corresponding data types with the target names and data types need to be specified in the Tasks list as individual objects. An example is provided at the end of this page
The above payload can also be uploaded through UI as shown below:
The following picture depicts the Flow listing page in Amorphic, along with the various options:
-
To start or stop a data flow
/datasources/
{datasource_id}
/dataflows/{id}
& POST methodRequest payload to start or stop a datasource flow{
"DataflowAction": <String> (Allowed values are start, stop)
}- For an ondemand type of datasource flow start will start the run and stop will stop the datasource flow.
- For a scheduled type of datasource flow start will activate the datasource flow if it is in suspended state and stop action is not supported
The following picture depicts the Dataflow Executions details page in Amorphic:
-
To grant Flow access to a tag
share/datasources grants & POST method
Grant access to a user{
"ResourceIds": [String]
"AccessTags": ["TagKey": <String>,"TagValue": <String>],
"AccessType": <String> (Allowed values are "owner" or "editor" or "read-only")
} -
To grant access to an individual user.
share/datasources grants & POST method
Grant access to a user{
"ResourceIds": [String]
"AccessTags": ["TagKey": "user","TagValue": <user_id>],
"AccessType": <String> (Allowed values are "owner" or "editor" or "read-only")
}share/datasources grants & DELETE method will revoke the access from the user
-
To retrieve metadata about objects.
/datasources/
{datasource_id}
?request_type=list_connector_entities & GET method- The above call will return all the top level objects for that connector.
- If the response for the desired object contains hasNestedEntities = true, then it means there are lower level objects.
- In this case, make a second request with an additional query parameter, entity_type = "top_level_object".
- Include a query parameter, api_version = "version" when using Custom Connectors.This specifies the API version of the underlying SaaS application.
-
To retrieve metadata about attributes of an object.
/datasources/
{datasource_id}
?request_type=describe_connector_entity&entity_type="object_name" & GET method- The above call will return all the fields associated with the object.
- Include a query parameter, api_version = "version" when using Custom Connectors.This specifies the API version of the underlying SaaS application.
-
To retrieve all datasets associated with an Datasource.
/datasources/
{datasource_id}
?request_type=get_dependent_resources & GET method- The above call will return all the datasets associated with the Datasource.
- Dataset details includes Id, Name and Domain.
Sample payloads
{
"DatasourceName": "slack-conn-profile",
"Description": "This is a test datasource to ingest data from slack",
"DatasourceType": "saas",
"IngestionType:"slack",
"DatasourceConfig": {
"clientId": "214567891798.2144988765890",
"clientSecret": "21216f9185b2ff73905aa5676589a",
"instanceUrl": "https://abcd-hq.slack.com",
"accessToken": "xoxp-2114398501798-2144980258432-456788970-cc3b194d3f0e05620bafdbdc77870285"
},
"Keywords": ["test", "new"]
}
{ "DatasourceName": "custom-conn-profile",
"DatasourceConnectorName": "custom-connector-name",
"Description": "This is a test datasource to ingest data from custom connector",
"DatasourceType": "saas",
"IngestionType": "customconnector",
"DatasourceConfig": {
"authenticationType": "oauth2",
"oauth2": {
"clientId": "3MVG9pRzvMkjMb6mEZ2PKzbg7vJrNdV3PGMXkTNi1Oh4O1AXhPY6htzxylytj9W.ZsqCneFQZLEC3JIbIAd",
"clientSecret": "996205D1E0E9C830D0B2A951BF4243EA43F20A6452455EF2B04D042376AF5FB4",
"accessToken": "00D5j00000AFMHW!ARgAQKvyyO2i0hXUV163vl00JOcPlKhI0tOexI7mXiLVzvoo6YKvZ.oORG.CnwWqRu9pFfbM8v__KKdyFeYdIrNxsjxlN",
"refreshToken": "5Aep861mdLLi91HqFfChWtIcJFTpP_yPr4TdBUsgTw2Cw7smtNx9KIxIOU874xUg7QgwsNkNmOd1eJrTbTk0Uma"
},
"profileProperties": {
"api_version": "v51.0",
"instanceUrl":"https://cloudwick-dev-dev-ed.my.salesforce.com"
},
"oAuth2Properties":{
"tokenUrl":"https://cloudwick-dev-dev-ed.my.salesforce.com/services/oauth2/token",
"oAuth2GrantType":"AUTHORIZATION_CODE"
}
},
"Keywords": ["test", "custom","post"]
}
{
"DataflowName": "slack-data-flow-ondemand",
"DatasourceType": "slack",
"CreateDataset": true,
"DatasetDetails": {
"DatasetName" : "slack_dataset",
"Description": "This is a test dataset to ingest slack data",
"Keywords": ["Test", "SlackData"],
"Domain": "testapr22",
"FileType": "csv",
"TargetLocation": "s3",
"SkipLZProcess" : true
},
"DataFormat": "csv",
"DataflowConfig":{
"TriggerType": "ondemand",
"SourceConnectorProperties": {
"Object": "conversations/C0234C1JTUP"
},
"Tasks": [
{
"connectorOperator" : { "Slack": "GREATER_THAN"},
"sourceFields" : [ "ts" ],
"taskProperties" : {
"DATA_TYPE" : "date",
"VALUE" : "1592159400000"
},
"taskType" : "Filter"
},
{
"destinationField" : "attachments",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "attachments" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
},
{
"destinationField" : "bot_id",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "bot_id" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
},
{
"destinationField" : "blocks",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "blocks" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
},
{
"destinationField" : "client_msg_id",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "client_msg_id" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
},
{
"destinationField" : "is_starred",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "is_starred" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "boolean",
"SOURCE_DATA_TYPE" : "boolean"
},
"taskType" : "Map"
}, {
"destinationField" : "last_read",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "last_read" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "latest_reply",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "latest_reply" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "reactions",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "reactions" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
}, {
"destinationField" : "replies",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "replies" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
}, {
"destinationField" : "reply_count",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "reply_count" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "integer",
"SOURCE_DATA_TYPE" : "integer"
},
"taskType" : "Map"
}, {
"destinationField" : "reply_users",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "reply_users" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
}, {
"destinationField" : "reply_users_count",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "reply_users_count" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "integer",
"SOURCE_DATA_TYPE" : "integer"
},
"taskType" : "Map"
}, {
"destinationField" : "subscribed",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "subscribed" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "boolean",
"SOURCE_DATA_TYPE" : "boolean"
},
"taskType" : "Map"
}, {
"destinationField" : "subtype",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "subtype" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "text",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "text" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "team",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "team" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "thread_ts",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "thread_ts" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "ts",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "ts" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "date",
"SOURCE_DATA_TYPE" : "date"
},
"taskType" : "Map"
}, {
"destinationField" : "type",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "type" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "user",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "user" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"connectorOperator" : { "Slack": "VALIDATE_NUMERIC"},
"sourceFields" : [ "client_msg_id" ],
"taskProperties" : {
"VALIDATION_ACTION" : "DropRecord"
},
"taskType" : "Validate"
}
]
}
}
For scheduled type of datasource flow, TriggerProperties will be like below
{
"TriggerProperties": {
"scheduleStartTime": "2021-08-25T01:45:00",
"scheduleEndTime": "2021-08-26T02:10:00",
"scheduleExpression": "rate(1days)",
"dataPullMode": "Complete"
}
}
{
"DataflowName": "slack-custom-flow",
"CreateDataset": false,
"DatasetDetails": {
"DatasetId": "2de2497c-8b96-47c1-b0b0-aabd80cb524e"
},
"DataFormat": "csv",
"DataflowConfig":{
"TriggerType": "scheduled",
"TriggerProperties": {
"scheduleStartTime": "2022-05-17T09:44:00",
"scheduleEndTime": "2022-05-17T09:50:00",
"scheduleExpression": "rate(2minutes)",
"dataPullMode": "Incremental"
},
"SourceConnectorProperties": {
"Object": "conversations/C035Y07CFJQ"
},
"IncrementalPullConfig":"ts",
"Tasks": [
{
"taskType": "Filter",
"sourceFields": [
"ts",
"type",
"text"
],
"taskProperties": {},
"connectorOperator": {
"Slack": "PROJECTION"
}
},
{
"taskType": "Map",
"sourceFields": [
"ts"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "DateTime",
"DESTINATION_DATA_TYPE": "DateTime"
},
"destinationField": "ts",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Map",
"sourceFields": [
"type"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "String",
"DESTINATION_DATA_TYPE": "String"
},
"destinationField": "type",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Map",
"sourceFields": [
"text"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "String",
"DESTINATION_DATA_TYPE": "String"
},
"destinationField": "text",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Validate",
"sourceFields": [
"ts"
],
"taskProperties": {
"VALIDATION_ACTION": "DropRecord"
},
"connectorOperator": {
"Slack": "VALIDATE_NON_NULL"
}
}
],
"tags": {}
}
}
{
"DataflowAction": "start"
}