Spark can create the illusion of ACID transactions with an open source library from Databricks called Delta Lake [GitHub]. Just bundle the package with your code and use format("delta") on your reads and writes and you're good to go. Data is stored as Parquet as usual but now we also have associated metadata stored in JSON files. These files indicate what state the data was in at each incremental step.
Do note, however, for clients of this data to take advantage of these ACID properties, they too need to be 'delta aware'.
What's happening?
The idea behind Delta Lake is very simple. Along with the Parquet, we record which files are relevent in the JSON file. These files are immutable but we may add more JSON files with each change.
Note that even if the directory has nested folders (say, from a partitionBy) all JSON files still seems to live at the top-level.
Working example
The code for this example lives in my GitHub repository and can be run out-of-the-box on a single JVM. It saves the files to MiniHDFS, that is, an in-memory Hadoop emulator.
In the example we:
- save the initial data
- append this data with some more data
- save some data this time overwriting the first
- update the data
- vacuum
Save the initial data
After persisting the first batch to HDFS in the delta format and SaveMode.Append, we see in the directory of the parquet:
hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000000.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/part-00000-5a70a0c7-2421-4fda-bf1e-fe584576eac5-c000.snappy.parquet
...
With the judicious help of the Linux CLI tool, jq [SO], the JSON file looks like this:
{
"commitInfo": {
"timestamp": 1585046814345,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "|[|]"
},
"isBlindAppend": true
}
}
{
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 2
}
}
Append the data
hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000000.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000001.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/part-00000-5a70a0c7-2421-4fda-bf1e-fe584576eac5-c000.snappy.parquet
...
The first JSON file is as before but the second looks like:
{
"commitInfo": {
"timestamp": 1585046817232,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "|[|]"
},
"readVersion": 0,
"isBlindAppend": true
}
...
}
{
"add": {
"path": "part-00000-fe3fba1f-5167-49a3-9a13-25d07ebd7fb6-c000.snappy.parquet",
"partitionValues": {},
"size": 350,
"modificationTime": 1585046817219,
"dataChange": true
}
}
{
"add": {
"path": "part-00001-f1a3eb5f-d392-4f7d-bf16-8aac55517c21-c000.snappy.parquet",
...
}
Overwrite the first batches
{
"commitInfo": {
"timestamp": 1585046819611,
"operation": "WRITE",
"operationParameters": {
"mode": "Overwrite",
"partitionBy": "|[|]"
},
"readVersion": 1,
"isBlindAppend": false
}
}
{
"add": {
"path": "part-00001-79f7a25d-c87a-4c5b-983e-1696e9f4b165-c000.snappy.parquet",
...
{
"remove": {
"path": "part-00000-fe3fba1f-5167-49a3-9a13-25d07ebd7fb6-c000.snappy.parquet",
"deletionTimestamp": 1585046819610,
"dataChange": true
}
}
...
As you can see, we see our original files being marked as removed. However they still stay where they are.
Update the data
Update the data
Now, we treat the file like a SQL table and update it with something like:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(filename)
deltaTable.update(col("value") === 1, Map("word" -> lit("wotcha")))
and a fourth JSON file appears that look like this:
{
"commitInfo": {
"timestamp": 1585046822731,
"operation": "UPDATE",
"operationParameters": {
"predicate": "(value#1417=1)"
},
"readVersion": 2,
"isBlindAppend": false
}
}
{
"remove": {
"path": "part-00008-0b439cc3-a2bc-4d4e-b4a8-4b2a719b4edf-c000.snappy.parquet",
"deletionTimestamp": 1585046822195,
"dataChange": true
}
}
{
"remove": {
...
{
"add": {
...
Vacuum
Vacuuming does to the Parquet files what a DB vacuum does to a database (that is, garbage collect).
No new JSON file is created for this step. The only change is that Parquet files disappear. In this example in my GitHub repository, the number of parquet files is reduced from 39 to 10.
Only the Parquet files that are not referenced in the later JSON metadata files have disappeared. So, any transactions that are using the old metadata are now in trouble. So, this may impact long running transactions.
Conclusion
Delta Lake is definitely an exciting step in the right direction but apparently it can be slow [Medium]. These version numbers and dates in the JSON metadata allow you to see the how the data looked at particulat points in time [databricks.com].
No comments:
Post a Comment