Wednesday, March 25, 2020

Spark on ACID


Spark can create the illusion of ACID transactions with an open source library from Databricks called Delta Lake [GitHub]. Just bundle the package with your code and use format("delta") on your reads and writes and you're good to go. Data is stored as Parquet as usual but now we also have associated metadata stored in JSON files. These files indicate what state the data was in at each incremental step.

Do note, however, for clients of this data to take advantage of these ACID properties, they too need to be 'delta aware'.

What's happening?

The idea behind Delta Lake is very simple. Along with the Parquet, we record which files are relevent in the JSON file. These files are immutable but we may add more JSON files with each change.

Note that even if the directory has nested folders (say, from a partitionBy) all JSON files still seems to live at the top-level.

Working example

The code for this example lives in my GitHub repository and can be run out-of-the-box on a single JVM. It saves the files to MiniHDFS, that is, an in-memory Hadoop emulator.

In the example we:

  1. save the initial data
  2. append this data with some more data 
  3. save some data this time overwriting the first
  4. update the data
  5. vacuum

Save the initial data

After persisting the first batch to HDFS in the delta format and SaveMode.Append, we see in the directory of the parquet:

hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000000.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/part-00000-5a70a0c7-2421-4fda-bf1e-fe584576eac5-c000.snappy.parquet
...

With the judicious help of the Linux CLI tool, jq [SO], the JSON file looks like this:

{
  "commitInfo": {
    "timestamp": 1585046814345,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Append",
      "partitionBy": "|[|]"
    },
    "isBlindAppend": true
  }
}
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  }
}

Append the data

After writing a second batch again with SaveMode.Append, we see:

hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000000.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000001.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/part-00000-5a70a0c7-2421-4fda-bf1e-fe584576eac5-c000.snappy.parquet
...

The first JSON file is as before but the second looks like:

{
  "commitInfo": {
    "timestamp": 1585046817232,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Append",
      "partitionBy": "|[|]"
    },
    "readVersion": 0,
    "isBlindAppend": true

  }
...
}
{
  "add": {
    "path": "part-00000-fe3fba1f-5167-49a3-9a13-25d07ebd7fb6-c000.snappy.parquet",
    "partitionValues": {},
    "size": 350,
    "modificationTime": 1585046817219,
    "dataChange": true
  }
}
{
  "add": {
    "path": "part-00001-f1a3eb5f-d392-4f7d-bf16-8aac55517c21-c000.snappy.parquet",
...
}

Overwrite the first batches

Next we save some more but this time with SaveMode.Overwrite. Now we see a third JSON file (along with the first two and it looks like this:

{
  "commitInfo": {
    "timestamp": 1585046819611,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Overwrite",
      "partitionBy": "|[|]"
    },
    "readVersion": 1,
    "isBlindAppend": false
  }
}
{
  "add": {
    "path": "part-00001-79f7a25d-c87a-4c5b-983e-1696e9f4b165-c000.snappy.parquet",
...
{
  "remove": {
    "path": "part-00000-fe3fba1f-5167-49a3-9a13-25d07ebd7fb6-c000.snappy.parquet",
    "deletionTimestamp": 1585046819610,
    "dataChange": true
  }
}
...

As you can see, we see our original files being marked as removed. However they still stay where they are.

Update the data

Now, we treat the file like a SQL table and update it with something like:

import io.delta.tables._

      val deltaTable  = DeltaTable.forPath(filename)
      deltaTable.update(col("value") === 1, Map("word" -> lit("wotcha")))

and a fourth JSON file appears that look like this:

{
  "commitInfo": {
    "timestamp": 1585046822731,
    "operation": "UPDATE",
    "operationParameters": {
      "predicate": "(value#1417=1)"
    },
    "readVersion": 2,
    "isBlindAppend": false
  }
}
{
  "remove": {
    "path": "part-00008-0b439cc3-a2bc-4d4e-b4a8-4b2a719b4edf-c000.snappy.parquet",
    "deletionTimestamp": 1585046822195,
    "dataChange": true
  }
}
{
  "remove": {
...
{
  "add": {
...

Vacuum

Vacuuming does to the Parquet files what a DB vacuum does to a database (that is, garbage collect).

No new JSON file is created for this step. The only change is that Parquet files disappear. In this example in my GitHub repository, the number of parquet files is reduced from 39 to 10.

Only the Parquet files that are not referenced in the later JSON metadata files have disappeared. So, any transactions that are using the old metadata are now in trouble. So, this may impact long running transactions.

Conclusion

Delta Lake is definitely an exciting step in the right direction but apparently it can be slow [Medium]. These version numbers and dates in the JSON metadata allow you to see the how the data looked at particulat points in time [databricks.com].


No comments:

Post a Comment