Transform

*Transform stages apply a single transformation to one or more incoming datasets.

Transformers should meet this criteria:

DiffTransform

Since: 1.0.8 - Supports Streaming: False

The DiffTransform stage calculates the difference between two input datasets and produces three datasets:

  • A dataset of the intersection of the two datasets - or rows that exist and are the same in both datasets.
  • A dataset of the left dataset - or rows that only exist in the left input dataset (inputLeftView).
  • A dataset of the right dataset - or rows that only exist in the right input dataset (inputRightView).

Persistence

This stage performs this ‘diffing’ operation in a single pass so if multiple of the output views are going to be used then it is a good idea to set persist = true to reduce the cost of recomputing the difference multiple times.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputLeftView String true Name of first incoming Spark dataset.
inputRightView String true Name of second incoming Spark dataset.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
outputIntersectionView String false Name of output intersection view.
outputLeftView String false Name of output left view.
outputRightView String false Name of output right view.
persist Boolean false Whether to persist dataset to Spark cache.

Examples

Minimal

{
  "type": "DiffTransform",
  "name": "calculate the difference between the yesterday and today datasets",
  "environments": [
    "production",
    "test"
  ],
  "inputLeftView": "customer_20180501",
  "inputRightView": "customer_20180502",
  "outputIntersectionView": "customer_unchanged"
}

Complete

{
  "type": "DiffTransform",
  "name": "calculate the difference between the yesterday and today datasets",
  "description": "calculate the difference between the yesterday and today datasets",
  "environments": [
    "production",
    "test"
  ],
  "inputLeftView": "customer_20180501",
  "inputRightView": "customer_20180502",
  "outputIntersectionView": "customer_unchanged",
  "outputLeftView": "customer_removed",
  "outputRightView": "customer_added",
  "persist": true
}

HTTPTransform

Since: 1.0.9 - Supports Streaming: True

The HTTPTransform stage transforms the incoming dataset by POSTing the value in the incoming dataset with column name value (must be of type string or bytes) and appending the response body from an external API as body.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
uri URI true URI of the HTTP server.
batchSize Integer false The number of records to send in each HTTP request to reduce the cost of HTTP overhead.

Default: 1.
delimiter String false When using a batchSize greater than one this option allows the specification of a delimiter so that the receiving HTTP service can split the request body into records and Arc can split the response body back into records.

Default: \n (newline).
description String false An optional stage description to help document job files and print to job logs to assist debugging.
failMode String false Either permissive or failfast:

permissive will process all rows in the dataset and collect HTTP response values (statusCode, reasonPhrase, contentType, responseTime) into a response column. Rules can then be applied in a SQLValidate stage if required.

failfast will fail the Arc job on the first reponse with a statusCode not in the validStatusCodes array.

Default: failfast.
headers Map[String, String] false HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers.
inputField String false The field to pass to the endpoint. JSON encoding can be used to pass multiple values (tuples).

Default: value.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
validStatusCodes Array[Integer] false A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202]. Note: all request response codes must be contained in this list for the stage to be successful if failMode is set to failfast.

Examples

Minimal

{
  "type": "HTTPTransform",
  "name": "look up customer retention score",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_enriched",
  "uri": "http://internalserver/api/customer_retention"
}

Complete

{
  "type": "HTTPTransform",
  "name": "look up customer retention score",
  "description": "look up customer retention score",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_enriched",
  "uri": "http://internalserver/api/customer_retention",
  "batchSize": 10,
  "delimiter": ",",
  "headers": {
    "Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
    "custom-header": "payload"
  },
  "inputField": "value",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "validStatusCodes": [
    200,
    201
  ],
  "failMode": "failfast"
}

JSONTransform

Since: 1.0.0 - Supports Streaming: True

The JSONTransform stage transforms the incoming dataset to rows of json strings with the column name value. It is intended to be used before stages like HTTPLoad or HTTPTransform to prepare the data for sending externally.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "JSONTransform",
  "name": "convert customer to json for sending to eternal api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "cutomer",
  "outputView": "customer_json"
}

Complete

{
  "type": "JSONTransform",
  "name": "convert customer to json for sending to eternal api",
  "description": "convert customer to json for sending to eternal api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "cutomer",
  "outputView": "customer_json",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

MetadataFilterTransform

Since: 1.0.9 - Supports Streaming: True

Experimental

The MetadataFilterTransform is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change.

The MetadataFilterTransform stage transforms the incoming dataset by filtering columns using the embedded column metadata.

Underneath Arc will register a table called metadata which contains the metadata of the inputView. This allows complex SQL statements to be executed which returns which columns to retain from the inputView in the outputView. The available columns in the metadata table are:

Field Description
name The field name.
type The field type.
metadata The field metadata.

This can be used like:

-- only select columns which are not personally identifiable information
SELECT 
    name 
FROM metadata 
WHERE metadata.pii = false

Will produce an outputView which only contains the columns in inputView where the inputView column metadata contains a key pii which has the value equal to false.

If the sqlParams contains boolean parameter pii_authorized if the job is authorised to use Personally identifiable information or not then it could be used like:

-- only select columns which job is authorised to access based on ${pii_authorized}
SELECT 
    name 
FROM metadata 
WHERE metadata.pii = (
    CASE 
        WHEN ${pii_authorized} = true 
        THEN metadata.pii   -- this will allow both true and false metadata.pii values if pii_authorized = true
        ELSE false          -- else if pii_authorized = false only allow metadata.pii = false values
    END
)

The inputView and outputView can be set to the same name so that downstream stages have no way of accessing the pre-filtered data accidentially.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the SQL statement.

This statement must be written to query against a table called metadata and must return at least the name column or an error will be raised.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
persist Boolean true Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "MetadataFilterTransform",
  "name": "filter out Personally identifiable information (pii) fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/filter_pii.sql",
  "inputView": "customer",
  "outputView": "customer_safe"
}

Complete

{
  "type": "MetadataFilterTransform",
  "name": "filter out Personally identifiable information (pii) fields",
  "description": "filter out Personally identifiable information (pii) fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/filter_pii_dynamic.sql",
  "inputView": "customer",
  "outputView": "customer_safe",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "sqlParams": {
    "pii_authorized": "true"
  }
}

MLTransform

Since: 1.0.0 - Supports Streaming: True

The MLTransform stage transforms the incoming dataset with a pretrained Spark ML (Machine Learning) model. This will append one or more predicted columns to the incoming dataset. The incoming model must be a PipelineModel or CrossValidatorModel produced using Spark’s Scala, Java, PySpark or SparkR API.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input PipelineModel or CrossValidatorModel.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean true Whether to persist dataset to Spark cache. Will also log row count.

Default: false. MLTransform will also log percentiles of prediction probabilities for classification models if this option is enabled.
description String false An optional stage description to help document job files and print to job logs to assist debugging.

Examples

Minimal

{
  "type": "MLTransform",
  "name": "apply machine learning model",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
  "inputView": "customer",
  "outputView": "customer_scored"
}

Complete

{
  "type": "MLTransform",
  "name": "apply machine learning model",
  "description": "apply machine learning model",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
  "inputView": "customer",
  "outputView": "customer_scored",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

SQLTransform

Since: 1.0.0 - Supports Streaming: True

The SQLTransform stage transforms the incoming dataset with a Spark SQL statement. This stage relies on previous stages to load and register the dataset views (outputView) and will execute arbitrary SQL statements against those datasets.

All the inbuilt Spark SQL functions are available and have been extended with some additional functions.

Please be aware that in streaming mode not all join operations are available. See: Support matrix for joins in streaming queries.

CAST vs TypingTransform

It is strongly recommended to use the TypingTransform for reproducible, repeatable results.

Whilst SQL is capable of converting data types using the CAST function (e.g. CAST(dateColumn AS DATE)) be very careful. ANSI SQL specifies that any failure to convert then an exception condition is raised: data exception-invalid character value for cast whereas Spark SQL will return a null value and suppress any exceptions: try s.toString.toInt catch { case _: NumberFormatException => null }. If you used a cast in a financial scenario, for example bill aggregation, the silent NULLing of values could result in errors being suppressed and bills incorrectly calculated.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the SQL statement.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

For example if the sqlParams contains parameter current_timestamp of value 2018-11-24 14:48:56 then this statement would execute in a deterministic way: SELECT * FROM customer WHERE expiry > FROM_UNIXTIME(UNIX_TIMESTAMP('${current_timestamp}', 'uuuu-MM-dd HH:mm:ss')) (so would be testable).

The SQL statement is a plain Spark SQL statement, for example:

SELECT 
    customer.customer_id
    ,customer.first_name
    ,customer.last_name
    ,account.account_id
    ,account.account_name
FROM customer
LEFT JOIN account ON account.customer_id = customer.customer_id

Examples

Minimal

{
  "type": "SQLTransform",
  "name": "standardise customer fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer.sql",
  "outputView": "customer"
}

Complete

{
  "type": "SQLTransform",
  "name": "standardise customer fields",
  "description": "standardise customer fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_dynamic.sql",
  "outputView": "customer",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "sqlParams": {
    "current_date": "2018-11-24",
    "current_timestamp": "2018-11-24 14:48:56"
  }
}

The current_date and current_timestamp can easily be passed in as environment variables using $(date "+%Y-%m-%d") and $(date "+%Y-%m-%d %H:%M:%S") respectively.

TensorFlowServingTransform

Since: 1.0.0 - Supports Streaming: True

Experimental

The TensorFlowServingTransform is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change.

The TensorFlowServingTransform stage transforms the incoming dataset by calling a TensorFlow Serving service. Because each call is atomic the TensorFlow Serving instances could be behind a load balancer to increase throughput.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
uri String true The URI of the TensorFlow Serving REST end point.
batchSize Int false The number of records to sent to TensorFlow Serving in each call. A higher number will decrease the number of calls to TensorFlow Serving which may be more efficient.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
inputField String false The field to pass to the model. JSON encoding can be used to pass multiple values (tuples).

Default: value.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
params Map[String, String] false Map of configuration parameters. Currently unused.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean true Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
responseType String false The type returned by the TensorFlow Serving API. Expected to be integer, double or object (which may present as a string depending on how the model has been built).

Default: object.
signatureName String false The name of the TensorFlow Serving signature.

Examples

Minimal

{
  "type": "TensorFlowServingTransform",
  "name": "call the customer segmentation model",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_segmented",
  "uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict"
}

Complete

{
  "type": "TensorFlowServingTransform",
  "name": "call the customer segmentation model",
  "description": "call the customer segmentation model",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_segmented",
  "uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict",
  "signatureName": "serving_default",
  "batchSize": 100,
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": true,
  "responseType": "integer"
}

TypingTransform

Since: 1.0.0 - Supports Streaming: True

The TypingTransform stage transforms the incoming dataset with based on metadata defined in the metadata format.

The logical process that is applied to perform the typing on a field-by-field basis is shown below.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the SQL statement.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
failMode String false Either permissive or failfast:

permissive will process all rows in the dataset and collect any errors for each row in the _errors column. Rules can then be applied in a SQLValidate stage if required.

failfast will fail the Arc job on the first row containing at least one error.

Default: permissive.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "TypingTransform",
  "name": "apply data types to customer records",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/metadata/customer.json",
  "inputView": "customer_untyped",
  "outputView": "customer"
}

Complete

{
  "type": "TypingTransform",
  "name": "apply data types to customer records",
  "description": "apply data types to customer records",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/metadata/customer.json",
  "inputView": "customer_untyped",
  "outputView": "customer",
  "authentication": {},
  "failMode": "failfast",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

A demonstration of how the TypingTransform behaves. Assuming you have read an input like a DelimitedExtract which will read a dataset where all the columns are read as strings:

+-------------------------+---------------------+
|startTime                |endTime              |
+-------------------------+---------------------+
|2018-09-26 07:17:43      |2018-09-27 07:17:43  |
|2018-09-25 08:25:51      |2018-09-26 08:25:51  |
|2018-02-30 01:16:40      |2018-03-01 01:16:40  |
|30 February 2018 01:16:40|2018-03-2018 01:16:40|
+-------------------------+---------------------+

In this case the goal is to safely convert the values from strings like "2018-09-26 07:17:43" to a proper timestamp object so that we can ensure the timestamp is valid (e.g. not on a date that does not exist e.g. the 30 day of February) and can easily perform date operations such as subtracting 1 week. To do so a metadata file could be constructed to look like:

[
  {
    "id": "8e42c8f0-22a8-40db-9798-6dd533c1de36",
    "name": "startTime",
    "description": "The startTime field.",
    "type": "timestamp",
    "trim": true,
    "nullable": true,
    "nullableValues": [
        "",
        "null"
    ],
    "formatters": [
        "uuuu-MM-dd HH:mm:ss"
    ],
    "timezoneId": "UTC"
  },
  {
    "id": "2e7553cf-2748-49cd-a291-8918823e706a",
    "name": "endTime",
    "description": "The endTime field.",
    "type": "timestamp",
    "trim": true,
    "nullable": true,
    "nullableValues": [
        "",
        "null"
    ],
    "formatters": [
        "uuuu-MM-dd HH:mm:ss"
    ],
    "timezoneId": "UTC"
  }   
]

Here is the output of the TypingTransformation when applied to the input dataset.

+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|startTime          |endTime            |_errors                                                                                                                                                                                                                                                             |
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018-09-26 17:17:43|2018-09-27 17:17:43|[]                                                                                                                                                                                                                                                                  |
|2018-09-25 18:25:51|2018-09-26 18:25:51|[]                                                                                                                                                                                                                                                                  |
|null               |2018-03-01 12:16:40|[[startTime, Unable to convert '2018-02-30 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']]                                                                                                                                     |
|null               |null               |[[startTime, Unable to convert '28 February 2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC'], [endTime, Unable to convert '2018-03-2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']]|
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  • Because the conversion happened successfully for both values on the first two rows there are no errors for those rows.
  • On the third row the value '2018-02-30 01:16:40' cannot be converted as the 30th day of February is not a valid date and the value is set to null. If the nullable in the metadata for field startTime was set to false the job would fail as it would be unable to continue.
  • On the forth row both rows are invalid as the formatter and date values are both wrong.

The SQLValidate stage is a good way to use this data to enforce data quality constraints.

Logical Flow

The sequence that these fields are converted from string fields to typed fields is per this flow chart. Each value and its typing metadata is passed into this logical process. For each row the values are returned as standard table columns and the returned error values are groupd into a field called _errors on a row-by-row basis. Patterns for consuming the _errors array is are demonstrated in the SQLValidate stage.

Logical Flow for Data Typing