Patterns
Database Inconsistency
When writing data to targets like databases using the JDBCLoad
raises a risk of ‘stale reads’ where a client is reading a dataset which is either old or one which is in the process of being updated and so is internally inconsistent.
Example
- create a new table each run using a
JDBCLoad
stage with a dynamic destination table specified as the${JOB_RUN_DATE}
environment variable (easily created with GNU date like:$(date +%Y-%m-%d)
) - the
JDBCLoad
will only complete successfully once the record count of source and target data have been confirmed to match - execute a
JDBCExecute
stage to perform a change to a view on the database to point to the new version of the table in a transaction-safe manner - if the job fails during any of these stages then the users will be unaware and will continue to consume the
customers
view which has the latest successful data
{
"type": "JDBCLoad",
"name": "load active customers to web server database",
"environments": [
"production",
"test"
],
"inputView": "ative_customers",
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "customers_"${JOB_RUN_DATE},
"params": {
"user": "mydbuser",
"password": "mydbpassword"
}
},
{
"type": "JDBCExecute",
"name": "update the current view to point to the latest version of the table",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/update_customer_view.sql",
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"sqlParams": {
"JOB_RUN_DATE": ${JOB_RUN_DATE}
},
"password": "mypassword",
"user": "myuser"
}
Where the update_customer_view.sql
statement is:
CREATE OR REPLACE VIEW customers AS
SELECT * FROM customers_${JOB_RUN_DATE}
Each of the main SQL databases behaves slighly different and has slighty different syntax but most can achieve a repointing of a view to a different table in an atomic operation (as it is a single statement).
Note that this method will require some cleanup activity to be performed or the number of tables will grow with each execution. A second JDBCExecute
stage could be added to clean up older verions of the underlying customers_
tables after successful ‘rollover’ execution.
Delta Processing
A common pattern is to reduce the amount of computation by processing only new files thereby reducing the amount of processing (and therefore cost) of expensive operations like the TypingTransform.
A simple way to do this is to use the glob
capabilities of Spark to extract a subset of files and then use a SQLTransform to merge them with a previous state stored in something like Parquet. It is suggested to have a large date overlap with the previous state dataset to avoid missed data. Be careful with this pattern as it assumes that the previous state is correct/complete and that no input files are late arriving.
Example
Assuming an input file structure like:
hdfs://datalake/input/customer/customers_2019-02-01.csv
hdfs://datalake/input/customer/customers_2019-02-02.csv
hdfs://datalake/input/customer/customers_2019-02-03.csv
hdfs://datalake/input/customer/customers_2019-02-04.csv
hdfs://datalake/input/customer/customers_2019-02-05.csv
hdfs://datalake/input/customer/customers_2019-02-06.csv
hdfs://datalake/input/customer/customers_2019-02-07.csv
Add an additional environment variable to the docker run
command which will calculate the delta processing period. By using GNU date the date maths of crossing month/year boundaries is easy and the formatting can be changed to suit your file naming convention.
-e ETL_CONF_DELTA_PERIOD="$(date --date='3 days ago' +%Y-%m-%d),$(date --date='2 days ago' +%Y-%m-%d),$(date --date='1 days ago' +%Y-%m-%d),$(date +%Y-%m-%d),$(date --date='1 days' +%Y-%m-%d)"
Which will expose and environment variable that looks like ETL_CONF_DELTA_PERIOD=2019-02-04,2019-02-05,2019-02-06,2019-02-07,2019-02-08
.
Alternatively, a Dynamic Configuration Plugin could be used to generate a similar list of dates.
This can then be used to read just the files which match the glob
pattern:
{
"type": "DelimitedExtract",
"name": "load customer extract deltas",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/input/customer/customers_{"${ETL_CONF_DELTA_PERIOD}"}.csv",
"outputView": "customer_delta_untyped"
},
{
"type": "TypingTransform",
"name": "apply data types to only the delta records",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/metadata/customer.json",
"inputView": "customer_delta_untyped",
"outputView": "customer_delta"
},
{
"type": "ParquetExtract",
"name": "load customer snapshot",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/output/customer/customers.parquet",
"outputView": "customer_snapshot"
},
{
"type": "SQLTransform",
"name": "merge the two datasets",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/select_most_recent_customer.sql",
"outputView": "customer"
}
In this case the files for dates 2019-02-01,2019-02-02,2019-02-03
will not be read as they are not in the ${ETL_CONF_DELTA_PERIOD}
input array.
A SQL WINDOW
can then be used to find the most recent record:
-- select only the most recent update record for each 'customer_id'
SELECT *
FROM (
-- rank the dataset by the 'last_updated' timestamp for each primary keys of the table ('customer_id')
SELECT
*
,ROW_NUMBER() OVER (PARTITION BY 'customer_id' ORDER BY COALESCE('last_updated', CAST('1970-01-01 00:00:00' AS TIMESTAMP)) DESC) AS row_number
FROM (
SELECT *
FROM customer_snapshot
UNION ALL
SELECT *
FROM customer_delta
) customers
) customers
WHERE row_number = 1
Duplicate Keys
To find duplicate keys and stop the job so any issues are not propogated can be done using a SQLValidate
stage which will fail with a list of invalid customer_id
s if more than one are found.
Example
SELECT
COUNT(*) = 0
,CASE
TO_JSON(NAMED_STRUCT('duplicate_customer_count', COUNT(*), 'duplicate_customer', CAST(COLLECT_LIST(DISTINCT customer_id) AS STRING)))
FROM (
SELECT
customer_id
,COUNT(customer_id) AS customer_id_count
FROM customer
GROUP BY customer_id
) valid
WHERE customer_id_count > 1
Fixed Width Input Formats
It is also quite common to recieve fixed width formats from older systems like IBM Mainframes like:
data |
---|
detail2016-12-1914.23 |
detail2016-12-20-3.98 |
detail2016-12-2118.20 |
Example
- Use a
DelimitedExtract
stage with a delimiter that will not be found in the data likeDefaultHive
to return dataset of many rows but single column. - Use a
SQLTransform
stage to split the data into columns.
SELECT
SUBSTRING(data, 0, 6) AS _type
,SUBSTRING(data, 7, 8) AS date
,SUBSTRING(data, 17, 4) AS total
FROM fixed_width_demo
- Use a
TypingTransform
stage to apply data types to the string columns returned bySQLTransform
.
Foreign Key Constraint
Another common data quality check is to check Foreign Key integrity, for example ensuring a customer record exists when loading an accounts dataset.
Example
Customers
customer_id | customer_name |
---|---|
29728375 | Eleazar Stehr |
69752261 | Lisette Roberts |
Accounts
customer_id | account_id | account_name |
---|---|---|
29728375 | 44205457 | Checking Account |
51805256 | 25102441 | Credit Card Account |
69752261 | 80393015 | Savings Account |
69752261 | 81704186 | Credit Card Account |
44953646 | 75082852 | Personal Loan Account |
This can be done using a SQLValidate
stage which will fail with a list of invalid accounts if any customer records are missing (be careful of not overloading your logging solution with long messages).
SELECT
SUM(invalid_customer_id) = 0
,TO_JSON(NAMED_STRUCT('customers', COUNT(DISTINCT customer_id), 'invalid_account_numbers_count', SUM(invalid_customer_id), 'invalid_account_numbers', CAST(collect_list(DISTINCT invalid_account_numbers) AS STRING)))
FROM (
SELECT
account.account_number
,customer.customer_id
,CASE
WHEN customer.customer_id IS NULL THEN account.account_number
ELSE null
END AS invalid_account_numbers
,CASE
WHEN customer.customer_id IS NULL THEN 1
ELSE 0
END AS invalid_customer_id
FROM account
LEFT JOIN customer ON account.customer_id = customer.customer_id
) valid
Header/Trailer Load Assurance
It is common to see formats like where the input dataset contains multiple record types with a trailer for some sort of load assurance/validation which allows processing this sort of data and ensure all records are successful.
col0 | col1 | col2 | col3 |
---|---|---|---|
header | 2016-12-21 | daily totals | |
detail | 2016-12-19 | daily total | 14.23 |
detail | 2016-12-20 | daily total | -3.98 |
detail | 2016-12-21 | daily total | 18.20 |
trailer | 3 | 28.45 |
Example
- First use a
DelimitedExtract
stage to load the raw data without headers. - Use two
SQLTransform
stages to split the input dataset into two newDataFrame
s using SQLWHERE
statements.
detail
SELECT
col0 AS _type
,col1 AS date
,col2 AS description
,col3 AS total
FROM raw
WHERE col0 = 'detail'
_type | date | description | total |
---|---|---|---|
detail | 2016-12-19 | daily total | 14.23 |
detail | 2016-12-20 | daily total | -3.98 |
detail | 2016-12-21 | daily total | 18.20 |
trailer
SELECT
col0 AS _type
,col1 AS trailer_records
,col2 AS trailer_balance
FROM raw
WHERE col0 = 'trailer'
_type | trailer_records | trailer_balance |
---|---|---|
trailer | 3 | 28.45 |
- Use two
TypingTransform
stages to apply data correct types to the two datasets. - Use a
SQLValidate
stage to ensure that the count and sum of thedetail
dataset equals that of thetrailer
dataset.
SELECT
sum_total = trailer_balance AND records_total = trailer_records
,TO_JSON(NAMED_STRUCT('expected_count', trailer_records, 'actual_count', records_total, 'expected_balance', trailer_balance, 'actual_balance', sum_total))
FROM (
(SELECT COUNT(total) AS records_total, SUM(total) AS sum_total FROM detail) detail
CROSS JOIN
(SELECT trailer_records, trailer_balance FROM trailer) trailer
) valid
Machine Learning Prediction Thresholds
When used for classification the MLTransform
stage will add a probability
column which exposes the highest probability score from the Spark ML probability vector which led to the predicted value. This can then be used as a boundary to prevent low probability predictions being sent to other systems if, for example, a change in input data resulted in a major change in predictions.
id | input | prediction | probability |
---|---|---|---|
4 | spark i j k | 1.0 | 0.8403592261212589 |
5 | l m n | 0.0 | 0.8378325685476612 |
6 | spark hadoop spark | 1.0 | 0.9307336686702373 |
7 | apache hadoop | 0.0 | 0.9821575333444208 |
Example
SELECT
SUM(low_probability) = 0
,TO_JSON(NAMED_STRUCT('probability_below_threshold', SUM(low_probability), 'threshold', 0.8))
FROM (
SELECT
CASE
WHEN customer_churn.probability < 0.8 THEN 1
ELSE 0
END AS low_probability
FROM customer_churn
) valid
The threshold parameter could be easily passed in as a sqlParam parameter and referenced as ${CUSTOMER_CHURN_PROBABILITY_THRESHOLD}
in the SQL code.
Nested Data
Because the SQL language wasn’t really designed with nested data like Spark allows it can be difficult to convert nested data into normal table structures. The EXPLODE and POSEXPLODE SQL functions are very useful for this conversion:
Example
Assuming a nested input structure like a JSON response that has been parsed via JSONExtract
:
{
"result": "success",
"data": [
{
"customerId": 1,
"active": true
},
{
"customerId": 2,
"active": false
},
{
"customerId": 3,
"active": true
}
]
}
To flatten the data
array use a SQL subquery and a POSEXPLODE to extract the data. EXPLODE
and POSEXPLODE
will both produce a field called col
which can be used from the parent query. POSEXPLODE
will include an additional field pos
to indicate the index of the value in the input array (which can be useful if array order is important for business logic).
SELECT
result
,pos
,col.*
FROM (
SELECT
result
,POSEXPLODE(data)
FROM result
) result
To produce:
+-------+---+------+----------+
|result |pos|active|customerId|
+-------+---+------+----------+
|success|0 |true |1 |
|success|1 |false |2 |
|success|2 |true |3 |
+-------+---+------+----------+
Testing with Parquet
If you want to manually create test data to compare against a Spark DataFrame a good option is to use the Apache Arrow library and the Python API to create a correctly typed Parquet. This file can then be loaded and compared with the EqualityValidate
stage.
Example
Using the publicly available Docker Conda image:
docker run -it -v $(pwd)/data:/tmp/data conda/miniconda3 python
Then with Python (of course normally you would install the required libraries into your own Docker image):
# install pyarrow - build your docker image so this is already installed
import subprocess
subprocess.call(['conda', 'install', '-y', '-c', 'conda-forge', 'pyarrow'])
# imports
import decimal
import datetime
import pytz
import pyarrow as pa
import pyarrow.parquet as pq
# create two rows (columnar) of each core data type corresponding with the metadata format
# be careful with null type here as it will be silently converted to a null IntegerType and will not match Spark's NullType
booleanDatum = pa.array([True, False], type=pa.bool_())
dateDatum = pa.array([datetime.date(2016, 12, 18), datetime.date(2016, 12, 19)])
decimalDatum = pa.array([decimal.Decimal('54.321'), decimal.Decimal('12.345')], type=pa.decimal128(38, 18))
doubleDatum = pa.array([42.4242, 21.2121], type=pa.float64())
integerDatum = pa.array([17, 34], type=pa.int32())
longDatum = pa.array([1520828868, 1520828123], type=pa.int64())
stringDatum = pa.array(['test,breakdelimiter', 'breakdelimiter,test'], type=pa.string())
timestampDatum = pa.array([datetime.datetime(2017, 12, 20, 21, 46, 54, 0, tzinfo=pytz.UTC), datetime.datetime(2017, 12, 29, 17, 21, 49, 0, tzinfo=pytz.UTC)])
timeDatum = pa.array(['12:34:56', '23:45:16'], type=pa.string())
nullDatum = pa.array([None, None], type=pa.null())
# create the arrow table
# we are using an arrow table rather than a dataframe to correctly align with spark datatypes
table = pa.Table.from_arrays([booleanDatum, dateDatum, decimalDatum, doubleDatum, integerDatum, longDatum, stringDatum, timestampDatum, timeDatum, nullDatum],
['booleanDatum', 'dateDatum', 'decimalDatum', 'doubleDatum', 'integerDatum', 'longDatum', 'stringDatum', 'timestampDatum', 'timeDatum', 'nullDatum'])
# write table to disk
pq.write_table(table, '/tmp/data/example.parquet', flavor='spark')
The suggestion then is to use the environments
key to only execute the EqualityValidate
stage whilst in testing mode:
{
"type": "ParquetExtract",
"name": "load customers",
"environments": [
"test"
],
"inputURI": "hdfs://input_data/customer/*.parquet",
"outputView": "customers_known_correct"
},
{
"type": "EqualityValidate",
"name": "verify calculated customer data equals preprepared customer data (test only)",
"environments": [
"test"
],
"leftView": "customers_caculated",
"rightView": "customers_known_correct"
}