Friday, May 26, 2023

Spark and Iceberg

Here are some notes I took when playing with Apache Iceberg plugged into Spark. (Update: Iceberg was already supported by Google but is now supported by AWS's Athena for Apache Spark - see here).

I'm running Spark 3.3 and Iceberg 1.2.1 with:

./spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.2.1\
    --conf spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=/tmp/warehouse \
    --conf spark.sql.defaultCatalog=local

This does a few things.

First, notice that the version of iceberg-spark-runtime must be the same major/minor version of the Spark you're running against. The library will automatically be downloaded by Spark if you don't already have it.

Data is saved in the directory defined by spark.sql.catalog.local.warehouse. Inside will be 2 directories, data and metadata.

Spark code

This took me a few hours to get a good feel for since there are lots of thunks passed around and functionality only executed when lazy values are touched. This is just the gist of what's going on:

Code wise, the interesting functionality appears to be in RuleExecutor.execute as each Rule[_] is executed via apply. The interesting code in the Iceberg Spark extension is applyd here as they all extend Rule[LogicalPlan].

LogicalPlan is a type of QueryPlan. QueryPlan can be either logical or physical, the latter being an extension of SparkPlan. You can spot these leafs in an AST quite easily as the naming convention say they end with Exec. And it appears to be V2TableWriteExec where the driver hands control over to the executors.

The tree of LogicalPlans are traversed in CheckAnalysis.checkAnalysis. But it's the instantiation of a Dataset that where its logicalPlan references the lazy QueryExecution.commandExecuted causing it to invoke eagerlyExecuteCommands(analyzed)

A notable sub-type of LogicalPlan is Command. This represents "a non-query command to be executed by the system".

Since Spark 3, an interface is available that is "responsible for creating and initializing the actual data writer at executor side. Note that, the writer factory will be serialized and sent to executors, then the data writer will be created on executors and do the actual writing." [DataWriterFactory docs]

Finally, making sense of the AST was problematic when the flow disappeared into WholeStageCodeGen as Spark then rolls the tree up and converts it to JVM bytecode.

Iceberg code

All the Iceberg initialization happens in IcebergSparkSessionExtensions. This subverts Spark's usual functionality and injects Iceberg specific functionality. One of the things it can inject is an IcebergSparkSqlExtensionsParser that visits the AST tree as it parses a SQL String to create a LogicalPlan

Iceberg also provides its own implementation of DataWriterFactory so it can use its own Table implementation under the Spark covers that allows (for instance) its own configuration for TableScans.

It's Iceberg's SnapshotProducer.commit() that's been injected into the Spark machinery that creates the manifest files.


No comments:

Post a Comment