Extract
*Extract
stages read in data from a database or file system.
*Extract
stages should meet this criteria:
- Read data from local or remote filesystems and return a
DataFrame
. - Do not transform/mutate the data.
- Allow for Predicate Pushdown depending on data source.
File based *Extract
stages can accept glob
patterns as input filenames which can be very useful to load just a subset of data. For example delta processing:
Pattern | Description |
---|---|
* |
Matches zero or more characters. |
? |
Matches any single character. |
[abc] |
Matches a single character in the set {a, b, c} . |
[a-b] |
Matches a single character from the character range {a...b} . |
[^a-b] |
Matches a single character that is not from character set or range {a...b} . |
{a,b} |
Matches either expression a or b . |
\c |
Removes (escapes) any special meaning of character c . |
{ab,c{de, fg}} |
Matches a string from the string set {ab, cde, cfg} . |
Spark will automatically match file extensions of .zip
, .bz2
, .deflate
and .gz
and perform decompression automatically.
AvroExtract
Since: 1.0.0 - Supports Streaming: False
The AvroExtract
stage reads one or more Apache Avro files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true* | URI/Glob of the input delimited Avro files. If not present inputView is requred. |
inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the Avro binary data. |
avroSchemaView | URI | false* | If using inputView this option allows you to specify the Avro schema URI. Has been tested to work with the Kafka Schema Registry with URI like http://kafka-schema-registry:8081/schemas/ids/1 as well as standalone *.avsc files. |
Examples
Minimal
{
"type": "AvroExtract",
"name": "load customer avro extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.avro",
"outputView": "customer"
}
Complete
{
"type": "AvroExtract",
"name": "load customer avro extract",
"description": "load customer avro extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.avro",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/metadata/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/*.avro",
"inputField": "value",
"avroSchemaURI": "hdfs://datalake/metadata/user.avsc"
}
AzureCosmosDBExtract
Since: 1.13.0 - Supports Streaming: True
Experimental
The AzureCosmosDBExtract
is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change.
The AzureCosmosDBExtract
stage reads data from a Azure Cosmos DB instance and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
config | Map[String, String] | false | The Cosmos DB configuration options. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "AzureCosmosDBExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"outputView": "customer",
"config": {
"Endpoint": "https://doctorwho.documents.azure.com:443/",
"Masterkey": "YOUR-KEY-HERE",
"Database": "DepartureDelays",
"Collection": "flights_pcoll",
"query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}
}
Complete
{
"type": "AzureCosmosDBExtract",
"name": "load customer extract",
"description": "load customer extract",
"environments": [
"production",
"test"
],
"outputView": "customer",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/metadata/customer.json",
"schemaView": "customer_schema",
"config": {
"Endpoint": "https://doctorwho.documents.azure.com:443/",
"Masterkey": "YOUR-KEY-HERE",
"Database": "DepartureDelays",
"Collection": "flights_pcoll",
"query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}
}
BytesExtract
Since: 1.0.9 - Supports Streaming: False
The BytesExtract
stage reads one or more binary files and returns a DataFrame
containing a Array[Byte]
of the file content (named value
) and the file path (named _filename
).
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset containing a list of URI/Globs to extract from. If not present inputURI is requred. |
inputURI | URI | true* | URI/Glob of the input binaryfiles. If not present inputView is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
failMode | String | false | Either permissive or failfast :permissive will create an empty dataframe of [value, _filename] in case of no files.failfast will fail the Arc job if no files are found.Default: failfast . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "BytesExtract",
"name": "load images from the customer vehicle photos directory",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/vehicles/*.jpg",
"outputView": "customer_vehicles_photos"
}
Complete
{
"type": "BytesExtract",
"name": "load images from the customer vehicle photos directory",
"description": "load images from the customer vehicle photos directory",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/vehicles/*.jpg",
"outputView": "customer_vehicles_photos",
"persist": false,
"numPartitions": 10,
"contiguousIndex": false,
"authentication": {},
"failMode": "permissive"
}
DatabricksDeltaExtract
Since: 1.8.0 - Supports Streaming: True
Experimental
The DatabricksDeltaExtract
is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change.
The DatabricksDeltaExtract
stage reads one or more Databricks Delta files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input Databricks Delta files. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "DatabricksDeltaExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "/delta/customers",
"outputView": "customer"
}
Complete
{
"type": "DatabricksDeltaExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "/delta/customers",
"outputView": "customer",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
}
DelimitedExtract
Since: 1.0.0 - Supports Streaming: True
The DelimitedExtract
stage reads either one or more delimited text files or an input Dataset[String]
and returns a DataFrame
. DelimitedExtract
will always set the underlying Spark configuration option of inferSchema
to false
to ensure consistent results.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
inputURI | URI | true* | URI/Glob of the input delimited text files. If not present inputView is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
delimiter | String | false | The type of delimiter in the file. Supported values: Comma , Pipe , DefaultHive . DefaultHive is ASCII character 1, the default delimiter for Apache Hive extracts.Default: Comma . |
customDelimiter | String | true* | A custom string to use as delimiter. Required if delimiter is set to Custom . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
header | Boolean | false | Whether or not the dataset contains a header row. If available the output dataset will have named columns otherwise columns will be named _col1 , _col2 … _colN .Default: false . |
inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the delimited data. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
quote | String | false | The type of quoting in the file. Supported values: None , SingleQuote , DoubleQuote .Default: DoubleQuote . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "DelimitedExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.csv",
"outputView": "customer"
}
Complete
{
"type": "DelimitedExtract",
"name": "load customer csv extract",
"description": "load customer csv extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.csv",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"delimiter": "Custom",
"customDelimiter": "#",
"header": false,
"inputField": "csvdata",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"quote": "DoubleQuote",
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}
ElasticsearchExtract
Since: 1.9.0 - Supports Streaming: False
Experimental
The ElasticsearchExtract
is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change.
The ElasticsearchExtract
stage reads from an Elasticsearch cluster and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
input | String | true | The name of the source Elasticsearch index. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
params | Map[String, String] | false | Map of configuration parameters. Parameters for connecting to the Elasticsearch cluster are detailed here. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "ElasticsearchExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"input": "customer",
"outputView": "customer",
"params": {
"es.nodes": "<my>.elasticsearch.com",
"es.port": "443",
"es.nodes.wan.only": "true",
"es.net.ssl": "true"
}
}
Complete
{
"type": "ElasticsearchExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"input": "customer",
"outputView": "customer",
"params": {
"es.nodes": "<my>.elasticsearch.com",
"es.port": "443",
"es.nodes.wan.only": "true",
"es.net.ssl": "true"
},
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false
}
HTTPExtract
Since: 1.0.0 - Supports Streaming: False
The HTTPExtract
executes either a GET
or POST
request against a remote HTTP service and returns a DataFrame
which will have a single row and single column holding the value of the HTTP response body.
This stage would typically be used with a JSONExtract
stage by specifying inputView
instead of inputURI
(setting multiLine
=true
allows processing of JSON array responses).
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset containing the list of URIs in value field. If not present inputURI is requred. |
inputURI | URI | true* | URI of the HTTP server. If not present inputView is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
body | String | false | The request body/entity that is sent with a POST request. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
headers | Map[String, String] | false | HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers. |
method | String | false | The request type with valid values GET or POST .Default: GET . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
validStatusCodes | Array[Integer] | false | A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202] . |
Examples
Minimal
{
"type": "HTTPExtract",
"name": "load customer extract from api",
"environments": [
"production",
"test"
],
"inputURI": "https://endpoint:9000/customers",
"outputView": "customer"
}
Complete
{
"type": "HTTPExtract",
"name": "load customer extract from api",
"description": "load customer extract from api",
"environments": [
"production",
"test"
],
"inputURI": "https://endpoint:9000/customers",
"outputView": "customer",
"body": "",
"headers": {
"Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
"custom-header": "payload"
},
"method": "GET",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"validStatusCodes": [
200
]
}
ImageExtract
Since: 1.4.1 - Supports Streaming: True
The ImageExtract
stage reads one or more image files and returns a DataFrame
which has one column: image
, containing image data (jpeg
, png
, gif
, bmp
, wbmp
) stored with the schema:
Field | Type | Description |
---|---|---|
origin |
String | The file path of the image. |
height |
Integer | The height of the image. |
width |
Integer | The width of the image. |
nChannels |
Integer | The number of image channels. |
mode |
Integer | OpenCV-compatible type. |
data |
Binary | Image bytes in OpenCV-compatible order: row-wise BGR in most cases. |
This means the image data can be accessed like:
SELECT image.height FROM dataset
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input images. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
dropInvalid | Boolean | false | Whether to drop any invalid image files. Default: true. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "ImageExtract",
"name": "load customer images",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.jpg",
"outputView": "customer"
}
Complete
{
"type": "ImageExtract",
"name": "load customer images",
"description": "load customer images",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.jpg",
"outputView": "customer",
"authentication": {},
"dropInvalid": true,
"numPartitions": 10,
"partitionBy": [
"image.width"
],
"persist": false,
"basePath": "hdfs://input_data/customer/"
}
JDBCExtract
Since: 1.0.0 - Supports Streaming: False
The JDBCExtract
reads directly from a JDBC Database and returns a DataFrame
. See Spark JDBC documentation.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
jdbcURL | String | true | The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306 . |
tableName | String | true | The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used, e.g. (SELECT * FROM sourcetable WHERE key=value) sourcetable or just sourcetable . |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
fetchsize | Integer | false | The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections. |
params | Map[String, String] | false | Map of configuration parameters. Currently requires user and password to be set here - see example below. |
partitionBy | Array[String] | false | Columns to partition the data by. |
partitionColumn | String | false | The name of a numeric column from the table in question which defines how to partition the table when reading in parallel from multiple workers. If set numPartitions must also be set. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
predicates | Array[String] | false | A list expressions suitable for inclusion in WHERE clauses; each one defines one partition of the DataFrame to allow explicit parallel reads.e.g. ['id=1', 'id=2', 'id=3', 'id=4'] would create 4 parallel readers. |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "JDBCExtract",
"name": "load active customers from postgres",
"environments": [
"production",
"test"
],
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
"outputView": "customer"
}
Complete
{
"type": "JDBCExtract",
"name": "load active customers from postgresql",
"description": "load active customers from postgresql",
"environments": [
"production",
"test"
],
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"fetchsize": 1000,
"numPartitions": 10,
"params": {
"user": "mydbuser",
"password": "mydbpassword"
},
"partitionBy": [
"country"
],
"partitionColumn": "id",
"persist": true,
"predicates": [
"id=1",
"id=2",
"id=3",
"id=4"
],
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema"
}
JSONExtract
Since: 1.0.0 - Supports Streaming: True
The JSONExtract
stage reads either one or more JSON files or an input Dataset[String]
and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
inputURI | URI | true* | URI/Glob of the input json files. If not present inputView is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the delimited data. |
multiLine | Boolean | false | Whether the input directory contains a single JSON object per file or multiple JSON records in a single file, one per line (see JSONLines. Default: true. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. |
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "JSONExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.json",
"outputView": "customer"
}
Complete
{
"type": "JSONExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.json",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"inputField": "jsondata",
"multiLine": false,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}
KafkaExtract
Since: 1.0.8 - Supports Streaming: True
Experimental
The KafkaExtract
is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change to better handle failures.
The KafkaExtract
stage reads records from a Kafka topic
and returns a DataFrame
. It requires a unique groupID
to be set which on first run will consume from the earliest
offset available in Kafka. Each subsequent run will use the offset as recorded against that groupID
. This means that if a job fails before properly processing the data then data may need to be restarted from the earliest offset by creating a new groupID
.
The returned DataFrame
has the schema:
Field | Type | Description |
---|---|---|
topic |
String | The Kafka Topic. |
partition |
Integer | The partition ID. |
offset |
Long | The record offset. |
timestamp |
Long | The record timestamp. |
key |
Binary | The record key as a byte array. |
value |
Binary | The record value as a byte array. |
Can be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour (with autoCommit
set to false
) - in that the offset commit can be deferred until certain dependent stages are sucessfully executed.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
bootstrapServers | String | true | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,... |
topic | String | true | The target Kafka topic. |
groupID | String | true | A string that uniquely identifies the group of consumer processes to which this consumer belongs. This will retain the offset of the job between executions. |
autoCommit | Boolean | false | Whether to update the offsets in Kafka automatically. To be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour. If autoCommit is set to false this stage will force persist equal to true so that Spark will not execute the Kafka extract process twice with a potentially different result (e.g. new messages added between extracts).Default: false. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
maxPollRecords | Int | false | The maximum number of records returned in a single call to Kafka. Arc will then continue to poll until all records have been read. Default: 10000 . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
timeout | Long | false | The time, in milliseconds, spent waiting in poll if data is not available in Kafka. Default: 10000. |
Examples
Minimal
{
"type": "KafkaExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"outputView": "customer",
"bootstrapServers": "kafka:29092",
"topic": "customers",
"groupID": "spark-customer-extract-job"
}
Complete
{
"type": "KafkaExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"outputView": "customer",
"bootstrapServers": "kafka:29092",
"topic": "customers",
"groupID": "spark-customer-extract-job",
"autoCommit": false,
"maxPollRecords": 10000,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"timeout": 10000
}
ORCExtract
Since: 1.0.0 - Supports Streaming: True
The ORCExtract
stage reads one or more Apache ORC files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input ORC files. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "ORCExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.orc",
"outputView": "customer"
}
Complete
{
"type": "ORCExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.orc",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}
ParquetExtract
Since: 1.0.0 - Supports Streaming: True
The ParquetExtract
stage reads one or more Apache Parquet files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input Parquet files. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "ParquetExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.parquet",
"outputView": "customer"
}
Complete
{
"type": "ParquetExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.parquet",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}
RateExtract
Since: 1.2.0 - Supports Streaming: True
The RateExtract
stage creates a streaming datasource which creates rows into a streaming DataFrame
with the signature [timestamp: timestamp, value: long]
.
This stage has been included for testing Structured Streaming jobs as it can be very difficult to generate test data. Generally this stage would only be included when Arc is run in a test mode (i.e. the environment
is set to test
).
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
rampUpTime | Integer | false | How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. Default: 0. |
rowsPerSecond | Integer | false | How many rows should be generated per second. Default: 1. |
Examples
Minimal
{
"type": "RateExtract",
"name": "create a streaming source",
"environments": [
"production",
"test"
],
"outputView": "stream"
}
Complete
{
"type": "RateExtract",
"name": "create a streaming source",
"description": "create a streaming source",
"environments": [
"production",
"test"
],
"outputView": "stream",
"rowsPerSecond": 2,
"rampUpTime": 0,
"numPartitions": 10
}
TextExtract
Since: 1.2.0 - Supports Streaming: True
The TextExtract
stage reads either one or more text files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input text files. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
multiLine | Boolean | false | Whether the to load the file as a single record or as individual records split by newline. Default: false. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "TextExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.txt",
"outputView": "customer"
}
Complete
{
"type": "TextExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.txt",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"multiLine": true,
"numPartitions": 10,
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}
XMLExtract
Since: 1.0.0 - Supports Streaming: False
The XMLExtract
stage reads one or more XML files or an input Dataset[String]
and returns a DataFrame
.
This extract works slightly different to the spark-xml
package. To access the data you can use a SQLTransform query like this which will create a new value for each row of the bk:books
array:
SELECT EXPLODE(`bk:books`).*
FROM books_xml
The backtick character (`) can be used to address fields with non-alphanumeric names.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true* | URI/Glob of the input delimited XML files. If not present inputView is requred. |
inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. |
schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "XMLExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.xml",
"outputView": "customer"
}
Complete
{
"type": "XMLExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.xml",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema"
}