Transform
*Transform stages apply a single transformation to one or more incoming datasets.
Transformers should meet this criteria:
- Be logically pure.
- Perform only a single function.
- Utilise Spark internal functionality where possible.
DiffTransform
Since: 1.0.8 - Supports Streaming: False
The DiffTransform stage calculates the difference between two input datasets and produces three datasets:
- A dataset of the
intersectionof the two datasets - or rows that exist and are the same in both datasets. - A dataset of the
leftdataset - or rows that only exist in the left input dataset (inputLeftView). - A dataset of the
rightdataset - or rows that only exist in the right input dataset (inputRightView).
Persistence
This stage performs this ‘diffing’ operation in a single pass so if multiple of the output views are going to be used then it is a good idea to set persist = true to reduce the cost of recomputing the difference multiple times.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputLeftView | String | true | Name of first incoming Spark dataset. |
| inputRightView | String | true | Name of second incoming Spark dataset. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| outputIntersectionView | String | false | Name of output intersection view. |
| outputLeftView | String | false | Name of output left view. |
| outputRightView | String | false | Name of output right view. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. |
Examples
Minimal
{
"type": "DiffTransform",
"name": "calculate the difference between the yesterday and today datasets",
"environments": [
"production",
"test"
],
"inputLeftView": "customer_20180501",
"inputRightView": "customer_20180502",
"outputIntersectionView": "customer_unchanged"
}Complete
{
"type": "DiffTransform",
"name": "calculate the difference between the yesterday and today datasets",
"description": "calculate the difference between the yesterday and today datasets",
"environments": [
"production",
"test"
],
"inputLeftView": "customer_20180501",
"inputRightView": "customer_20180502",
"outputIntersectionView": "customer_unchanged",
"outputLeftView": "customer_removed",
"outputRightView": "customer_added",
"persist": true
}HTTPTransform
Since: 1.0.9 - Supports Streaming: True
The HTTPTransform stage transforms the incoming dataset by POSTing the value in the incoming dataset with column name value (must be of type string or bytes) and appending the response body from an external API as body.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputView | String | true | Name of incoming Spark dataset. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| uri | URI | true | URI of the HTTP server. |
| batchSize | Integer | false | The number of records to send in each HTTP request to reduce the cost of HTTP overhead. Default: 1. |
| delimiter | String | false | When using a batchSize greater than one this option allows the specification of a delimiter so that the receiving HTTP service can split the request body into records and Arc can split the response body back into records.Default: \n (newline). |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| failMode | String | false | Either permissive or failfast:permissive will process all rows in the dataset and collect HTTP response values (statusCode, reasonPhrase, contentType, responseTime) into a response column. Rules can then be applied in a SQLValidate stage if required.failfast will fail the Arc job on the first reponse with a statusCode not in the validStatusCodes array.Default: failfast. |
| headers | Map[String, String] | false | HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers. |
| inputField | String | false | The field to pass to the endpoint. JSON encoding can be used to pass multiple values (tuples). Default: value. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| validStatusCodes | Array[Integer] | false | A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202]. Note: all request response codes must be contained in this list for the stage to be successful if failMode is set to failfast. |
Examples
Minimal
{
"type": "HTTPTransform",
"name": "look up customer retention score",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_enriched",
"uri": "http://internalserver/api/customer_retention"
}Complete
{
"type": "HTTPTransform",
"name": "look up customer retention score",
"description": "look up customer retention score",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_enriched",
"uri": "http://internalserver/api/customer_retention",
"batchSize": 10,
"delimiter": ",",
"headers": {
"Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
"custom-header": "payload"
},
"inputField": "value",
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false,
"validStatusCodes": [
200,
201
],
"failMode": "failfast"
}JSONTransform
Since: 1.0.0 - Supports Streaming: True
The JSONTransform stage transforms the incoming dataset to rows of json strings with the column name value. It is intended to be used before stages like HTTPLoad or HTTPTransform to prepare the data for sending externally.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputView | String | true | Name of incoming Spark dataset. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
Examples
Minimal
{
"type": "JSONTransform",
"name": "convert customer to json for sending to eternal api",
"environments": [
"production",
"test"
],
"inputView": "cutomer",
"outputView": "customer_json"
}Complete
{
"type": "JSONTransform",
"name": "convert customer to json for sending to eternal api",
"description": "convert customer to json for sending to eternal api",
"environments": [
"production",
"test"
],
"inputView": "cutomer",
"outputView": "customer_json",
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false
}MetadataFilterTransform
Since: 1.0.9 - Supports Streaming: True
Experimental
The MetadataFilterTransform is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change.
The MetadataFilterTransform stage transforms the incoming dataset by filtering columns using the embedded column metadata.
Underneath Arc will register a table called metadata which contains the metadata of the inputView. This allows complex SQL statements to be executed which returns which columns to retain from the inputView in the outputView. The available columns in the metadata table are:
| Field | Description |
|---|---|
| name | The field name. |
| type | The field type. |
| metadata | The field metadata. |
This can be used like:
-- only select columns which are not personally identifiable information
SELECT
name
FROM metadata
WHERE metadata.pii = false
Will produce an outputView which only contains the columns in inputView where the inputView column metadata contains a key pii which has the value equal to false.
If the sqlParams contains boolean parameter pii_authorized if the job is authorised to use Personally identifiable information or not then it could be used like:
-- only select columns which job is authorised to access based on ${pii_authorized}
SELECT
name
FROM metadata
WHERE metadata.pii = (
CASE
WHEN ${pii_authorized} = true
THEN metadata.pii -- this will allow both true and false metadata.pii values if pii_authorized = true
ELSE false -- else if pii_authorized = false only allow metadata.pii = false values
END
)
The inputView and outputView can be set to the same name so that downstream stages have no way of accessing the pre-filtered data accidentially.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI of the input file containing the SQL statement. This statement must be written to query against a table called metadata and must return at least the name column or an error will be raised. |
| inputView | String | true | Name of incoming Spark dataset. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| persist | Boolean | true | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Examples
Minimal
{
"type": "MetadataFilterTransform",
"name": "filter out Personally identifiable information (pii) fields",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/filter_pii.sql",
"inputView": "customer",
"outputView": "customer_safe"
}Complete
{
"type": "MetadataFilterTransform",
"name": "filter out Personally identifiable information (pii) fields",
"description": "filter out Personally identifiable information (pii) fields",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/filter_pii_dynamic.sql",
"inputView": "customer",
"outputView": "customer_safe",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false,
"sqlParams": {
"pii_authorized": "true"
}
}MLTransform
Since: 1.0.0 - Supports Streaming: True
The MLTransform stage transforms the incoming dataset with a pretrained Spark ML (Machine Learning) model. This will append one or more predicted columns to the incoming dataset. The incoming model must be a PipelineModel or CrossValidatorModel produced using Spark’s Scala, Java, PySpark or SparkR API.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI of the input PipelineModel or CrossValidatorModel. |
| inputView | String | true | Name of incoming Spark dataset. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | true | Whether to persist dataset to Spark cache. Will also log row count. Default: false. MLTransform will also log percentiles of prediction probabilities for classification models if this option is enabled. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
Examples
Minimal
{
"type": "MLTransform",
"name": "apply machine learning model",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
"inputView": "customer",
"outputView": "customer_scored"
}Complete
{
"type": "MLTransform",
"name": "apply machine learning model",
"description": "apply machine learning model",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
"inputView": "customer",
"outputView": "customer_scored",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false
}SQLTransform
Since: 1.0.0 - Supports Streaming: True
The SQLTransform stage transforms the incoming dataset with a Spark SQL statement. This stage relies on previous stages to load and register the dataset views (outputView) and will execute arbitrary SQL statements against those datasets.
All the inbuilt Spark SQL functions are available and have been extended with some additional functions.
Please be aware that in streaming mode not all join operations are available. See: Support matrix for joins in streaming queries.
CAST vs TypingTransform
It is strongly recommended to use the TypingTransform for reproducible, repeatable results.
Whilst SQL is capable of converting data types using the CAST function (e.g. CAST(dateColumn AS DATE)) be very careful. ANSI SQL specifies that any failure to convert then an exception condition is raised: data exception-invalid character value for cast whereas Spark SQL will return a null value and suppress any exceptions: try s.toString.toInt catch { case _: NumberFormatException => null }. If you used a cast in a financial scenario, for example bill aggregation, the silent NULLing of values could result in errors being suppressed and bills incorrectly calculated.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI of the input file containing the SQL statement. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format.For example if the sqlParams contains parameter current_timestamp of value 2018-11-24 14:48:56 then this statement would execute in a deterministic way: SELECT * FROM customer WHERE expiry > FROM_UNIXTIME(UNIX_TIMESTAMP('${current_timestamp}', 'uuuu-MM-dd HH:mm:ss')) (so would be testable). |
The SQL statement is a plain Spark SQL statement, for example:
SELECT
customer.customer_id
,customer.first_name
,customer.last_name
,account.account_id
,account.account_name
FROM customer
LEFT JOIN account ON account.customer_id = customer.customer_id
Examples
Minimal
{
"type": "SQLTransform",
"name": "standardise customer fields",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/customer.sql",
"outputView": "customer"
}Complete
{
"type": "SQLTransform",
"name": "standardise customer fields",
"description": "standardise customer fields",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/customer_dynamic.sql",
"outputView": "customer",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false,
"sqlParams": {
"current_date": "2018-11-24",
"current_timestamp": "2018-11-24 14:48:56"
}
}The current_date and current_timestamp can easily be passed in as environment variables using $(date "+%Y-%m-%d") and $(date "+%Y-%m-%d %H:%M:%S") respectively.
TensorFlowServingTransform
Since: 1.0.0 - Supports Streaming: True
Experimental
The TensorFlowServingTransform is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change.
The TensorFlowServingTransform stage transforms the incoming dataset by calling a TensorFlow Serving service. Because each call is atomic the TensorFlow Serving instances could be behind a load balancer to increase throughput.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputView | String | true | Name of incoming Spark dataset. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| uri | String | true | The URI of the TensorFlow Serving REST end point. |
| batchSize | Int | false | The number of records to sent to TensorFlow Serving in each call. A higher number will decrease the number of calls to TensorFlow Serving which may be more efficient. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| inputField | String | false | The field to pass to the model. JSON encoding can be used to pass multiple values (tuples). Default: value. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| params | Map[String, String] | false | Map of configuration parameters. Currently unused. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | true | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| responseType | String | false | The type returned by the TensorFlow Serving API. Expected to be integer, double or object (which may present as a string depending on how the model has been built).Default: object. |
| signatureName | String | false | The name of the TensorFlow Serving signature. |
Examples
Minimal
{
"type": "TensorFlowServingTransform",
"name": "call the customer segmentation model",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_segmented",
"uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict"
}Complete
{
"type": "TensorFlowServingTransform",
"name": "call the customer segmentation model",
"description": "call the customer segmentation model",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_segmented",
"uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict",
"signatureName": "serving_default",
"batchSize": 100,
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": true,
"responseType": "integer"
}TypingTransform
Since: 1.0.0 - Supports Streaming: True
The TypingTransform stage transforms the incoming dataset with based on metadata defined in the metadata format.
The logical process that is applied to perform the typing on a field-by-field basis is shown below.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI of the input file containing the SQL statement. |
| inputView | String | true | Name of incoming Spark dataset. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| failMode | String | false | Either permissive or failfast:permissive will process all rows in the dataset and collect any errors for each row in the _errors column. Rules can then be applied in a SQLValidate stage if required.failfast will fail the Arc job on the first row containing at least one error.Default: permissive. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
Examples
Minimal
{
"type": "TypingTransform",
"name": "apply data types to customer records",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/metadata/customer.json",
"inputView": "customer_untyped",
"outputView": "customer"
}Complete
{
"type": "TypingTransform",
"name": "apply data types to customer records",
"description": "apply data types to customer records",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/metadata/customer.json",
"inputView": "customer_untyped",
"outputView": "customer",
"authentication": {},
"failMode": "failfast",
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false
}A demonstration of how the TypingTransform behaves. Assuming you have read an input like a DelimitedExtract which will read a dataset where all the columns are read as strings:
+-------------------------+---------------------+
|startTime |endTime |
+-------------------------+---------------------+
|2018-09-26 07:17:43 |2018-09-27 07:17:43 |
|2018-09-25 08:25:51 |2018-09-26 08:25:51 |
|2018-02-30 01:16:40 |2018-03-01 01:16:40 |
|30 February 2018 01:16:40|2018-03-2018 01:16:40|
+-------------------------+---------------------+
In this case the goal is to safely convert the values from strings like "2018-09-26 07:17:43" to a proper timestamp object so that we can ensure the timestamp is valid (e.g. not on a date that does not exist e.g. the 30 day of February) and can easily perform date operations such as subtracting 1 week. To do so a metadata file could be constructed to look like:
[
{
"id": "8e42c8f0-22a8-40db-9798-6dd533c1de36",
"name": "startTime",
"description": "The startTime field.",
"type": "timestamp",
"trim": true,
"nullable": true,
"nullableValues": [
"",
"null"
],
"formatters": [
"uuuu-MM-dd HH:mm:ss"
],
"timezoneId": "UTC"
},
{
"id": "2e7553cf-2748-49cd-a291-8918823e706a",
"name": "endTime",
"description": "The endTime field.",
"type": "timestamp",
"trim": true,
"nullable": true,
"nullableValues": [
"",
"null"
],
"formatters": [
"uuuu-MM-dd HH:mm:ss"
],
"timezoneId": "UTC"
}
]
Here is the output of the TypingTransformation when applied to the input dataset.
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|startTime |endTime |_errors |
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018-09-26 17:17:43|2018-09-27 17:17:43|[] |
|2018-09-25 18:25:51|2018-09-26 18:25:51|[] |
|null |2018-03-01 12:16:40|[[startTime, Unable to convert '2018-02-30 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']] |
|null |null |[[startTime, Unable to convert '28 February 2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC'], [endTime, Unable to convert '2018-03-2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']]|
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
- Because the conversion happened successfully for both values on the first two rows there are no errors for those rows.
- On the third row the value
'2018-02-30 01:16:40'cannot be converted as the 30th day of February is not a valid date and the value is set tonull. If thenullablein the metadata for fieldstartTimewas set tofalsethe job would fail as it would be unable to continue. - On the forth row both rows are invalid as the
formatteranddatevalues are both wrong.
The SQLValidate stage is a good way to use this data to enforce data quality constraints.
Logical Flow
The sequence that these fields are converted from string fields to typed fields is per this flow chart. Each value and its typing metadata is passed into this logical process. For each row the values are returned as standard table columns and the returned error values are groupd into a field called _errors on a row-by-row basis. Patterns for consuming the _errors array is are demonstrated in the SQLValidate stage.
