Extract
*Extract stages read in data from a database or file system.
*Extract stages should meet this criteria:
- Read data from local or remote filesystems and return a
DataFrame. - Do not transform/mutate the data.
- Allow for Predicate Pushdown depending on data source.
File based *Extract stages can accept glob patterns as input filenames which can be very useful to load just a subset of data. For example delta processing:
| Pattern | Description |
|---|---|
* |
Matches zero or more characters. |
? |
Matches any single character. |
[abc] |
Matches a single character in the set {a, b, c}. |
[a-b] |
Matches a single character from the character range {a...b}. |
[^a-b] |
Matches a single character that is not from character set or range {a...b}. |
{a,b} |
Matches either expression a or b. |
\c |
Removes (escapes) any special meaning of character c. |
{ab,c{de, fg}} |
Matches a string from the string set {ab, cde, cfg}. |
Spark will automatically match file extensions of .zip, .bz2, .deflate and .gz and perform decompression automatically.
AvroExtract
Since: 1.0.0 - Supports Streaming: False
The AvroExtract stage reads one or more Apache Avro files and returns a DataFrame.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true* | URI/Glob of the input delimited Avro files. If not present inputView is requred. |
| inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| basePath | URI | false | The base path that partition discovery should start with. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| schemaURI | URI | false | Used for multiple purposes:
|
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
| inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the Avro binary data. |
| avroSchemaView | URI | false* | If using inputView this option allows you to specify the Avro schema URI. Has been tested to work with the Kafka Schema Registry with URI like http://kafka-schema-registry:8081/schemas/ids/1 as well as standalone *.avsc files. |
Examples
Minimal
{
"type": "AvroExtract",
"name": "load customer avro extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.avro",
"outputView": "customer"
}Complete
{
"type": "AvroExtract",
"name": "load customer avro extract",
"description": "load customer avro extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.avro",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/metadata/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/*.avro",
"inputField": "value",
"avroSchemaURI": "hdfs://datalake/metadata/user.avsc"
}AzureCosmosDBExtract
Since: 1.13.0 - Supports Streaming: True
Experimental
The AzureCosmosDBExtract is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change.
The AzureCosmosDBExtract stage reads data from a Azure Cosmos DB instance and returns a DataFrame.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| config | Map[String, String] | false | The Cosmos DB configuration options. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| schemaURI | URI | false | Used for multiple purposes:
|
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "AzureCosmosDBExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"outputView": "customer",
"config": {
"Endpoint": "https://doctorwho.documents.azure.com:443/",
"Masterkey": "YOUR-KEY-HERE",
"Database": "DepartureDelays",
"Collection": "flights_pcoll",
"query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}
}Complete
{
"type": "AzureCosmosDBExtract",
"name": "load customer extract",
"description": "load customer extract",
"environments": [
"production",
"test"
],
"outputView": "customer",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/metadata/customer.json",
"schemaView": "customer_schema",
"config": {
"Endpoint": "https://doctorwho.documents.azure.com:443/",
"Masterkey": "YOUR-KEY-HERE",
"Database": "DepartureDelays",
"Collection": "flights_pcoll",
"query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}
}BytesExtract
Since: 1.0.9 - Supports Streaming: False
The BytesExtract stage reads one or more binary files and returns a DataFrame containing a Array[Byte] of the file content (named value) and the file path (named _filename).
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputView | String | true* | Name of the incoming Spark dataset containing a list of URI/Globs to extract from. If not present inputURI is requred. |
| inputURI | URI | true* | URI/Glob of the input binaryfiles. If not present inputView is requred. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| failMode | String | false | Either permissive or failfast:permissive will create an empty dataframe of [value, _filename] in case of no files.failfast will fail the Arc job if no files are found.Default: failfast. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
Examples
Minimal
{
"type": "BytesExtract",
"name": "load images from the customer vehicle photos directory",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/vehicles/*.jpg",
"outputView": "customer_vehicles_photos"
}Complete
{
"type": "BytesExtract",
"name": "load images from the customer vehicle photos directory",
"description": "load images from the customer vehicle photos directory",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/vehicles/*.jpg",
"outputView": "customer_vehicles_photos",
"persist": false,
"numPartitions": 10,
"contiguousIndex": false,
"authentication": {},
"failMode": "permissive"
}DatabricksDeltaExtract
Since: 1.8.0 - Supports Streaming: True
Experimental
The DatabricksDeltaExtract is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change.
The DatabricksDeltaExtract stage reads one or more Databricks Delta files and returns a DataFrame.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI/Glob of the input Databricks Delta files. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
Examples
Minimal
{
"type": "DatabricksDeltaExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "/delta/customers",
"outputView": "customer"
}Complete
{
"type": "DatabricksDeltaExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "/delta/customers",
"outputView": "customer",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
}DelimitedExtract
Since: 1.0.0 - Supports Streaming: True
The DelimitedExtract stage reads either one or more delimited text files or an input Dataset[String] and returns a DataFrame. DelimitedExtract will always set the underlying Spark configuration option of inferSchema to false to ensure consistent results.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
| inputURI | URI | true* | URI/Glob of the input delimited text files. If not present inputView is requred. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| basePath | URI | false | The base path that partition discovery should start with. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| delimiter | String | false | The type of delimiter in the file. Supported values: Comma, Pipe, DefaultHive. DefaultHive is ASCII character 1, the default delimiter for Apache Hive extracts.Default: Comma. |
| customDelimiter | String | true* | A custom string to use as delimiter. Required if delimiter is set to Custom. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| header | Boolean | false | Whether or not the dataset contains a header row. If available the output dataset will have named columns otherwise columns will be named _col1, _col2 … _colN.Default: false. |
| inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the delimited data. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| quote | String | false | The type of quoting in the file. Supported values: None, SingleQuote, DoubleQuote.Default: DoubleQuote. |
| schemaURI | URI | false | Used for multiple purposes:
|
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "DelimitedExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.csv",
"outputView": "customer"
}Complete
{
"type": "DelimitedExtract",
"name": "load customer csv extract",
"description": "load customer csv extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.csv",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"delimiter": "Custom",
"customDelimiter": "#",
"header": false,
"inputField": "csvdata",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"quote": "DoubleQuote",
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}ElasticsearchExtract
Since: 1.9.0 - Supports Streaming: False
Experimental
The ElasticsearchExtract is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change.
The ElasticsearchExtract stage reads from an Elasticsearch cluster and returns a DataFrame.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| input | String | true | The name of the source Elasticsearch index. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| params | Map[String, String] | false | Map of configuration parameters. Parameters for connecting to the Elasticsearch cluster are detailed here. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
Examples
Minimal
{
"type": "ElasticsearchExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"input": "customer",
"outputView": "customer",
"params": {
"es.nodes": "<my>.elasticsearch.com",
"es.port": "443",
"es.nodes.wan.only": "true",
"es.net.ssl": "true"
}
}Complete
{
"type": "ElasticsearchExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"input": "customer",
"outputView": "customer",
"params": {
"es.nodes": "<my>.elasticsearch.com",
"es.port": "443",
"es.nodes.wan.only": "true",
"es.net.ssl": "true"
},
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false
}HTTPExtract
Since: 1.0.0 - Supports Streaming: False
The HTTPExtract executes either a GET or POST request against a remote HTTP service and returns a DataFrame which will have a single row and single column holding the value of the HTTP response body.
This stage would typically be used with a JSONExtract stage by specifying inputView instead of inputURI (setting multiLine=true allows processing of JSON array responses).
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputView | String | true* | Name of the incoming Spark dataset containing the list of URIs in value field. If not present inputURI is requred. |
| inputURI | URI | true* | URI of the HTTP server. If not present inputView is requred. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| body | String | false | The request body/entity that is sent with a POST request. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| headers | Map[String, String] | false | HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers. |
| method | String | false | The request type with valid values GET or POST.Default: GET. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| validStatusCodes | Array[Integer] | false | A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202]. |
Examples
Minimal
{
"type": "HTTPExtract",
"name": "load customer extract from api",
"environments": [
"production",
"test"
],
"inputURI": "https://endpoint:9000/customers",
"outputView": "customer"
}Complete
{
"type": "HTTPExtract",
"name": "load customer extract from api",
"description": "load customer extract from api",
"environments": [
"production",
"test"
],
"inputURI": "https://endpoint:9000/customers",
"outputView": "customer",
"body": "",
"headers": {
"Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
"custom-header": "payload"
},
"method": "GET",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"validStatusCodes": [
200
]
}ImageExtract
Since: 1.4.1 - Supports Streaming: True
The ImageExtract stage reads one or more image files and returns a DataFrame which has one column: image, containing image data (jpeg, png, gif, bmp, wbmp) stored with the schema:
| Field | Type | Description |
|---|---|---|
origin |
String | The file path of the image. |
height |
Integer | The height of the image. |
width |
Integer | The width of the image. |
nChannels |
Integer | The number of image channels. |
mode |
Integer | OpenCV-compatible type. |
data |
Binary | Image bytes in OpenCV-compatible order: row-wise BGR in most cases. |
This means the image data can be accessed like:
SELECT image.height FROM dataset
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI/Glob of the input images. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| basePath | URI | false | The base path that partition discovery should start with. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| dropInvalid | Boolean | false | Whether to drop any invalid image files. Default: true. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
Examples
Minimal
{
"type": "ImageExtract",
"name": "load customer images",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.jpg",
"outputView": "customer"
}Complete
{
"type": "ImageExtract",
"name": "load customer images",
"description": "load customer images",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.jpg",
"outputView": "customer",
"authentication": {},
"dropInvalid": true,
"numPartitions": 10,
"partitionBy": [
"image.width"
],
"persist": false,
"basePath": "hdfs://input_data/customer/"
}JDBCExtract
Since: 1.0.0 - Supports Streaming: False
The JDBCExtract reads directly from a JDBC Database and returns a DataFrame. See Spark JDBC documentation.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| jdbcURL | String | true | The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306. |
| tableName | String | true | The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used, e.g. (SELECT * FROM sourcetable WHERE key=value) sourcetable or just sourcetable. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| fetchsize | Integer | false | The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections. |
| params | Map[String, String] | false | Map of configuration parameters. Currently requires user and password to be set here - see example below. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| partitionColumn | String | false | The name of a numeric column from the table in question which defines how to partition the table when reading in parallel from multiple workers. If set numPartitions must also be set. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| predicates | Array[String] | false | A list expressions suitable for inclusion in WHERE clauses; each one defines one partition of the DataFrame to allow explicit parallel reads.e.g. ['id=1', 'id=2', 'id=3', 'id=4'] would create 4 parallel readers. |
| schemaURI | URI | false | Used for multiple purposes:
|
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "JDBCExtract",
"name": "load active customers from postgres",
"environments": [
"production",
"test"
],
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
"outputView": "customer"
}Complete
{
"type": "JDBCExtract",
"name": "load active customers from postgresql",
"description": "load active customers from postgresql",
"environments": [
"production",
"test"
],
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"fetchsize": 1000,
"numPartitions": 10,
"params": {
"user": "mydbuser",
"password": "mydbpassword"
},
"partitionBy": [
"country"
],
"partitionColumn": "id",
"persist": true,
"predicates": [
"id=1",
"id=2",
"id=3",
"id=4"
],
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema"
}JSONExtract
Since: 1.0.0 - Supports Streaming: True
The JSONExtract stage reads either one or more JSON files or an input Dataset[String] and returns a DataFrame.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
| inputURI | URI | true* | URI/Glob of the input json files. If not present inputView is requred. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| basePath | URI | false | The base path that partition discovery should start with. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the delimited data. |
| multiLine | Boolean | false | Whether the input directory contains a single JSON object per file or multiple JSON records in a single file, one per line (see JSONLines. Default: true. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| schemaURI | URI | false | Used for multiple purposes:
Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. |
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "JSONExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.json",
"outputView": "customer"
}Complete
{
"type": "JSONExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.json",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"inputField": "jsondata",
"multiLine": false,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}KafkaExtract
Since: 1.0.8 - Supports Streaming: True
Experimental
The KafkaExtract is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change to better handle failures.
The KafkaExtract stage reads records from a Kafka topic and returns a DataFrame. It requires a unique groupID to be set which on first run will consume from the earliest offset available in Kafka. Each subsequent run will use the offset as recorded against that groupID. This means that if a job fails before properly processing the data then data may need to be restarted from the earliest offset by creating a new groupID.
The returned DataFrame has the schema:
| Field | Type | Description |
|---|---|---|
topic |
String | The Kafka Topic. |
partition |
Integer | The partition ID. |
offset |
Long | The record offset. |
timestamp |
Long | The record timestamp. |
key |
Binary | The record key as a byte array. |
value |
Binary | The record value as a byte array. |
Can be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour (with autoCommit set to false) - in that the offset commit can be deferred until certain dependent stages are sucessfully executed.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| bootstrapServers | String | true | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,... |
| topic | String | true | The target Kafka topic. |
| groupID | String | true | A string that uniquely identifies the group of consumer processes to which this consumer belongs. This will retain the offset of the job between executions. |
| autoCommit | Boolean | false | Whether to update the offsets in Kafka automatically. To be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour. If autoCommit is set to false this stage will force persist equal to true so that Spark will not execute the Kafka extract process twice with a potentially different result (e.g. new messages added between extracts).Default: false. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| maxPollRecords | Int | false | The maximum number of records returned in a single call to Kafka. Arc will then continue to poll until all records have been read. Default: 10000. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| timeout | Long | false | The time, in milliseconds, spent waiting in poll if data is not available in Kafka. Default: 10000. |
Examples
Minimal
{
"type": "KafkaExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"outputView": "customer",
"bootstrapServers": "kafka:29092",
"topic": "customers",
"groupID": "spark-customer-extract-job"
}Complete
{
"type": "KafkaExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"outputView": "customer",
"bootstrapServers": "kafka:29092",
"topic": "customers",
"groupID": "spark-customer-extract-job",
"autoCommit": false,
"maxPollRecords": 10000,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"timeout": 10000
}ORCExtract
Since: 1.0.0 - Supports Streaming: True
The ORCExtract stage reads one or more Apache ORC files and returns a DataFrame.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI/Glob of the input ORC files. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| basePath | URI | false | The base path that partition discovery should start with. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| schemaURI | URI | false | Used for multiple purposes:
|
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "ORCExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.orc",
"outputView": "customer"
}Complete
{
"type": "ORCExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.orc",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}ParquetExtract
Since: 1.0.0 - Supports Streaming: True
The ParquetExtract stage reads one or more Apache Parquet files and returns a DataFrame.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI/Glob of the input Parquet files. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| basePath | URI | false | The base path that partition discovery should start with. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| schemaURI | URI | false | Used for multiple purposes:
|
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "ParquetExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.parquet",
"outputView": "customer"
}Complete
{
"type": "ParquetExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.parquet",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}RateExtract
Since: 1.2.0 - Supports Streaming: True
The RateExtract stage creates a streaming datasource which creates rows into a streaming DataFrame with the signature [timestamp: timestamp, value: long].
This stage has been included for testing Structured Streaming jobs as it can be very difficult to generate test data. Generally this stage would only be included when Arc is run in a test mode (i.e. the environment is set to test).
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| rampUpTime | Integer | false | How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. Default: 0. |
| rowsPerSecond | Integer | false | How many rows should be generated per second. Default: 1. |
Examples
Minimal
{
"type": "RateExtract",
"name": "create a streaming source",
"environments": [
"production",
"test"
],
"outputView": "stream"
}Complete
{
"type": "RateExtract",
"name": "create a streaming source",
"description": "create a streaming source",
"environments": [
"production",
"test"
],
"outputView": "stream",
"rowsPerSecond": 2,
"rampUpTime": 0,
"numPartitions": 10
}TextExtract
Since: 1.2.0 - Supports Streaming: True
The TextExtract stage reads either one or more text files and returns a DataFrame.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true | URI/Glob of the input text files. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| basePath | URI | false | The base path that partition discovery should start with. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| multiLine | Boolean | false | Whether the to load the file as a single record or as individual records split by newline. Default: false. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| schemaURI | URI | false | Used for multiple purposes:
|
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "TextExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.txt",
"outputView": "customer"
}Complete
{
"type": "TextExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.txt",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"multiLine": true,
"numPartitions": 10,
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://input_data/customer/"
}XMLExtract
Since: 1.0.0 - Supports Streaming: False
The XMLExtract stage reads one or more XML files or an input Dataset[String] and returns a DataFrame.
This extract works slightly different to the spark-xml package. To access the data you can use a SQLTransform query like this which will create a new value for each row of the bk:books array:
SELECT EXPLODE(`bk:books`).*
FROM books_xml
The backtick character (`) can be used to address fields with non-alphanumeric names.
Parameters
| Attribute | Type | Required | Description |
|---|---|---|---|
| name | String | true | Name of the stage for logging. |
| environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
| inputURI | URI | true* | URI/Glob of the input delimited XML files. If not present inputView is requred. |
| inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
| outputView | String | true | Name of outgoing Spark dataset after processing. |
| authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
| contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.Default: true. |
| description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
| numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
| partitionBy | Array[String] | false | Columns to partition the data by. |
| persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false. |
| schemaURI | URI | false | Used for multiple purposes:
Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. |
| schemaView | URI | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided. |
Examples
Minimal
{
"type": "XMLExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.xml",
"outputView": "customer"
}Complete
{
"type": "XMLExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://input_data/customer/*.xml",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://input_data/schema/customer.json",
"schemaView": "customer_schema"
}