Extract

*Extract stages read in data from a database or file system.

*Extract stages should meet this criteria:

File based *Extract stages can accept glob patterns as input filenames which can be very useful to load just a subset of data. For example delta processing:

Pattern Description
* Matches zero or more characters.
? Matches any single character.
[abc] Matches a single character in the set {a, b, c}.
[a-b] Matches a single character from the character range {a...b}.
[^a-b] Matches a single character that is not from character set or range {a...b}.
{a,b} Matches either expression a or b.
\c Removes (escapes) any special meaning of character c.
{ab,c{de, fg}} Matches a string from the string set {ab, cde, cfg}.

Spark will automatically match file extensions of .zip, .bz2, .deflate and .gz and perform decompression automatically.

AvroExtract

Since: 1.0.0 - Supports Streaming: False

The AvroExtract stage reads one or more Apache Avro files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true* URI/Glob of the input delimited Avro files. If not present inputView is requred.
inputView String true* Name of the incoming Spark dataset. If not present inputURI is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.
inputField String false If using inputView this option allows you to specify the name of the field which contains the Avro binary data.
avroSchemaView URI false* If using inputView this option allows you to specify the Avro schema URI. Has been tested to work with the Kafka Schema Registry with URI like http://kafka-schema-registry:8081/schemas/ids/1 as well as standalone *.avsc files.

Examples

Minimal

{
  "type": "AvroExtract",
  "name": "load customer avro extract",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.avro",
  "outputView": "customer"
}

Complete

{
  "type": "AvroExtract",
  "name": "load customer avro extract",
  "description": "load customer avro extract",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.avro",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/metadata/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://input_data/customer/*.avro",
  "inputField": "value",
  "avroSchemaURI": "hdfs://datalake/metadata/user.avsc"
}

AzureCosmosDBExtract

Since: 1.13.0 - Supports Streaming: True

Experimental

The AzureCosmosDBExtract is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change.

The AzureCosmosDBExtract stage reads data from a Azure Cosmos DB instance and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
outputView String true Name of outgoing Spark dataset after processing.
config Map[String, String] false The Cosmos DB configuration options.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.

Examples

Minimal

{
  "type": "AzureCosmosDBExtract",
  "name": "load customer extract",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "customer",
  "config": {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
  }
}

Complete

{
  "type": "AzureCosmosDBExtract",
  "name": "load customer extract",
  "description": "load customer extract",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "customer",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/metadata/customer.json",
  "schemaView": "customer_schema",
  "config": {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
  }
}

BytesExtract

Since: 1.0.9 - Supports Streaming: False

The BytesExtract stage reads one or more binary files and returns a DataFrame containing a Array[Byte] of the file content (named value) and the file path (named _filename).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset containing a list of URI/Globs to extract from. If not present inputURI is requred.
inputURI URI true* URI/Glob of the input binaryfiles. If not present inputView is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
failMode String false Either permissive or failfast:

permissive will create an empty dataframe of [value, _filename] in case of no files.

failfast will fail the Arc job if no files are found.

Default: failfast.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "BytesExtract",
  "name": "load images from the customer vehicle photos directory",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/vehicles/*.jpg",
  "outputView": "customer_vehicles_photos"
}

Complete

{
  "type": "BytesExtract",
  "name": "load images from the customer vehicle photos directory",
  "description": "load images from the customer vehicle photos directory",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/vehicles/*.jpg",
  "outputView": "customer_vehicles_photos",
  "persist": false,
  "numPartitions": 10,
  "contiguousIndex": false,
  "authentication": {},
  "failMode": "permissive"
}

DatabricksDeltaExtract

Since: 1.8.0 - Supports Streaming: True

Experimental

The DatabricksDeltaExtract is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change.

The DatabricksDeltaExtract stage reads one or more Databricks Delta files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input Databricks Delta files.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "DatabricksDeltaExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "/delta/customers",
  "outputView": "customer"
}

Complete

{
  "type": "DatabricksDeltaExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "/delta/customers",
  "outputView": "customer",
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
}

DelimitedExtract

Since: 1.0.0 - Supports Streaming: True

The DelimitedExtract stage reads either one or more delimited text files or an input Dataset[String] and returns a DataFrame. DelimitedExtract will always set the underlying Spark configuration option of inferSchema to false to ensure consistent results.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset. If not present inputURI is requred.
inputURI URI true* URI/Glob of the input delimited text files. If not present inputView is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
delimiter String false The type of delimiter in the file. Supported values: Comma, Pipe, DefaultHive. DefaultHive is ASCII character 1, the default delimiter for Apache Hive extracts.

Default: Comma.
customDelimiter String true* A custom string to use as delimiter. Required if delimiter is set to Custom.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
header Boolean false Whether or not the dataset contains a header row. If available the output dataset will have named columns otherwise columns will be named _col1, _col2_colN.

Default: false.
inputField String false If using inputView this option allows you to specify the name of the field which contains the delimited data.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
quote String false The type of quoting in the file. Supported values: None, SingleQuote, DoubleQuote.

Default: DoubleQuote.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.

Examples

Minimal

{
  "type": "DelimitedExtract",
  "name": "load customer extract",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.csv",
  "outputView": "customer"
}

Complete

{
  "type": "DelimitedExtract",
  "name": "load customer csv extract",
  "description": "load customer csv extract",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.csv",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "delimiter": "Custom",
  "customDelimiter": "#",
  "header": false,
  "inputField": "csvdata",
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "quote": "DoubleQuote",
  "schemaURI": "hdfs://input_data/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://input_data/customer/"
}

ElasticsearchExtract

Since: 1.9.0 - Supports Streaming: False

Experimental

The ElasticsearchExtract is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change.

The ElasticsearchExtract stage reads from an Elasticsearch cluster and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
input String true The name of the source Elasticsearch index.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
params Map[String, String] false Map of configuration parameters. Parameters for connecting to the Elasticsearch cluster are detailed here.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "ElasticsearchExtract",
  "name": "load customer extract",
  "environments": [
    "production",
    "test"
  ],
  "input": "customer",
  "outputView": "customer",
  "params": {
    "es.nodes": "<my>.elasticsearch.com",
    "es.port": "443",
    "es.nodes.wan.only": "true",
    "es.net.ssl": "true"
  }
}

Complete

{
  "type": "ElasticsearchExtract",
  "name": "load customer extract",
  "environments": [
    "production",
    "test"
  ],
  "input": "customer",
  "outputView": "customer",
  "params": {
    "es.nodes": "<my>.elasticsearch.com",
    "es.port": "443",
    "es.nodes.wan.only": "true",
    "es.net.ssl": "true"
  },
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false
}

HTTPExtract

Since: 1.0.0 - Supports Streaming: False

The HTTPExtract executes either a GET or POST request against a remote HTTP service and returns a DataFrame which will have a single row and single column holding the value of the HTTP response body.

This stage would typically be used with a JSONExtract stage by specifying inputView instead of inputURI (setting multiLine=true allows processing of JSON array responses).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset containing the list of URIs in value field. If not present inputURI is requred.
inputURI URI true* URI of the HTTP server. If not present inputView is requred.
outputView String true Name of outgoing Spark dataset after processing.
body String false The request body/entity that is sent with a POST request.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
headers Map[String, String] false HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers.
method String false The request type with valid values GET or POST.

Default: GET.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
validStatusCodes Array[Integer] false A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202].

Examples

Minimal

{
  "type": "HTTPExtract",
  "name": "load customer extract from api",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "https://endpoint:9000/customers",
  "outputView": "customer"
}

Complete

{
  "type": "HTTPExtract",
  "name": "load customer extract from api",
  "description": "load customer extract from api",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "https://endpoint:9000/customers",
  "outputView": "customer",
  "body": "",
  "headers": {
    "Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
    "custom-header": "payload"
  },
  "method": "GET",
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "validStatusCodes": [
    200
  ]
}

ImageExtract

Since: 1.4.1 - Supports Streaming: True

The ImageExtract stage reads one or more image files and returns a DataFrame which has one column: image, containing image data (jpeg, png, gif, bmp, wbmp) stored with the schema:

Field Type Description
origin String The file path of the image.
height Integer The height of the image.
width Integer The width of the image.
nChannels Integer The number of image channels.
mode Integer OpenCV-compatible type.
data Binary Image bytes in OpenCV-compatible order: row-wise BGR in most cases.

This means the image data can be accessed like:

SELECT image.height FROM dataset

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input images.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
dropInvalid Boolean false Whether to drop any invalid image files.

Default: true.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "ImageExtract",
  "name": "load customer images",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.jpg",
  "outputView": "customer"
}

Complete

{
  "type": "ImageExtract",
  "name": "load customer images",
  "description": "load customer images",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.jpg",
  "outputView": "customer",
  "authentication": {},
  "dropInvalid": true,
  "numPartitions": 10,
  "partitionBy": [
    "image.width"
  ],
  "persist": false,
  "basePath": "hdfs://input_data/customer/"
}

JDBCExtract

Since: 1.0.0 - Supports Streaming: False

The JDBCExtract reads directly from a JDBC Database and returns a DataFrame. See Spark JDBC documentation.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
jdbcURL String true The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306.
tableName String true The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used, e.g. (SELECT * FROM sourcetable WHERE key=value) sourcetable or just sourcetable.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
fetchsize Integer false The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
numPartitions Integer false The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections.
params Map[String, String] false Map of configuration parameters. Currently requires user and password to be set here - see example below.
partitionBy Array[String] false Columns to partition the data by.
partitionColumn String false The name of a numeric column from the table in question which defines how to partition the table when reading in parallel from multiple workers. If set numPartitions must also be set.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
predicates Array[String] false A list expressions suitable for inclusion in WHERE clauses; each one defines one partition of the DataFrame to allow explicit parallel reads.

e.g. ['id=1', 'id=2', 'id=3', 'id=4'] would create 4 parallel readers.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.

Examples

Minimal

{
  "type": "JDBCExtract",
  "name": "load active customers from postgres",
  "environments": [
    "production",
    "test"
  ],
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer",
  "tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
  "outputView": "customer"
}

Complete

{
  "type": "JDBCExtract",
  "name": "load active customers from postgresql",
  "description": "load active customers from postgresql",
  "environments": [
    "production",
    "test"
  ],
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer",
  "tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "fetchsize": 1000,
  "numPartitions": 10,
  "params": {
    "user": "mydbuser",
    "password": "mydbpassword"
  },
  "partitionBy": [
    "country"
  ],
  "partitionColumn": "id",
  "persist": true,
  "predicates": [
    "id=1",
    "id=2",
    "id=3",
    "id=4"
  ],
  "schemaURI": "hdfs://input_data/schema/customer.json",
  "schemaView": "customer_schema"
}

JSONExtract

Since: 1.0.0 - Supports Streaming: True

The JSONExtract stage reads either one or more JSON files or an input Dataset[String] and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset. If not present inputURI is requred.
inputURI URI true* URI/Glob of the input json files. If not present inputView is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
inputField String false If using inputView this option allows you to specify the name of the field which contains the delimited data.
multiLine Boolean false Whether the input directory contains a single JSON object per file or multiple JSON records in a single file, one per line (see JSONLines.

Default: true.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.



Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading.
schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.

Examples

Minimal

{
  "type": "JSONExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.json",
  "outputView": "customer"
}

Complete

{
  "type": "JSONExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.json",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "inputField": "jsondata",
  "multiLine": false,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://input_data/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://input_data/customer/"
}

KafkaExtract

Since: 1.0.8 - Supports Streaming: True

Experimental

The KafkaExtract is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change to better handle failures.

The KafkaExtract stage reads records from a Kafka topic and returns a DataFrame. It requires a unique groupID to be set which on first run will consume from the earliest offset available in Kafka. Each subsequent run will use the offset as recorded against that groupID. This means that if a job fails before properly processing the data then data may need to be restarted from the earliest offset by creating a new groupID.

The returned DataFrame has the schema:

Field Type Description
topic String The Kafka Topic.
partition Integer The partition ID.
offset Long The record offset.
timestamp Long The record timestamp.
key Binary The record key as a byte array.
value Binary The record value as a byte array.

Can be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour (with autoCommit set to false) - in that the offset commit can be deferred until certain dependent stages are sucessfully executed.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
outputView String true Name of outgoing Spark dataset after processing.
bootstrapServers String true A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,...
topic String true The target Kafka topic.
groupID String true A string that uniquely identifies the group of consumer processes to which this consumer belongs. This will retain the offset of the job between executions.
autoCommit Boolean false Whether to update the offsets in Kafka automatically. To be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour.

If autoCommit is set to false this stage will force persist equal to true so that Spark will not execute the Kafka extract process twice with a potentially different result (e.g. new messages added between extracts).

Default: false.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
maxPollRecords Int false The maximum number of records returned in a single call to Kafka. Arc will then continue to poll until all records have been read.

Default: 10000.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
timeout Long false The time, in milliseconds, spent waiting in poll if data is not available in Kafka. Default: 10000.

Examples

Minimal

{
  "type": "KafkaExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "customer",
  "bootstrapServers": "kafka:29092",
  "topic": "customers",
  "groupID": "spark-customer-extract-job"
}

Complete

{
  "type": "KafkaExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "customer",
  "bootstrapServers": "kafka:29092",
  "topic": "customers",
  "groupID": "spark-customer-extract-job",
  "autoCommit": false,
  "maxPollRecords": 10000,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "timeout": 10000
}

ORCExtract

Since: 1.0.0 - Supports Streaming: True

The ORCExtract stage reads one or more Apache ORC files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input ORC files.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.

Examples

Minimal

{
  "type": "ORCExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.orc",
  "outputView": "customer"
}

Complete

{
  "type": "ORCExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.orc",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://input_data/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://input_data/customer/"
}

ParquetExtract

Since: 1.0.0 - Supports Streaming: True

The ParquetExtract stage reads one or more Apache Parquet files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input Parquet files.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.

Examples

Minimal

{
  "type": "ParquetExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.parquet",
  "outputView": "customer"
}

Complete

{
  "type": "ParquetExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.parquet",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://input_data/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://input_data/customer/"
}

RateExtract

Since: 1.2.0 - Supports Streaming: True

The RateExtract stage creates a streaming datasource which creates rows into a streaming DataFrame with the signature [timestamp: timestamp, value: long].

This stage has been included for testing Structured Streaming jobs as it can be very difficult to generate test data. Generally this stage would only be included when Arc is run in a test mode (i.e. the environment is set to test).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
rampUpTime Integer false How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds.

Default: 0.
rowsPerSecond Integer false How many rows should be generated per second.

Default: 1.

Examples

Minimal

{
  "type": "RateExtract",
  "name": "create a streaming source",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "stream"
}

Complete

{
  "type": "RateExtract",
  "name": "create a streaming source",
  "description": "create a streaming source",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "stream",
  "rowsPerSecond": 2,
  "rampUpTime": 0,
  "numPartitions": 10
}

TextExtract

Since: 1.2.0 - Supports Streaming: True

The TextExtract stage reads either one or more text files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input text files.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
multiLine Boolean false Whether the to load the file as a single record or as individual records split by newline.

Default: false.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.

Examples

Minimal

{
  "type": "TextExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.txt",
  "outputView": "customer"
}

Complete

{
  "type": "TextExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.txt",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "multiLine": true,
  "numPartitions": 10,
  "persist": false,
  "schemaURI": "hdfs://input_data/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://input_data/customer/"
}

XMLExtract

Since: 1.0.0 - Supports Streaming: False

The XMLExtract stage reads one or more XML files or an input Dataset[String] and returns a DataFrame.

This extract works slightly different to the spark-xml package. To access the data you can use a SQLTransform query like this which will create a new value for each row of the bk:books array:

SELECT EXPLODE(`bk:books`).*
FROM books_xml

The backtick character (`) can be used to address fields with non-alphanumeric names.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true* URI/Glob of the input delimited XML files. If not present inputView is requred.
inputView String true* Name of the incoming Spark dataset. If not present inputURI is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.



Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading.
schemaView URI false Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.

Examples

Minimal

{
  "type": "XMLExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.xml",
  "outputView": "customer"
}

Complete

{
  "type": "XMLExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://input_data/customer/*.xml",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://input_data/schema/customer.json",
  "schemaView": "customer_schema"
}