Wednesday, May 31, 2023

Spark Catalysts

I've been trying to see how Spark works under the covers. The TL;DR is that it services your queries by dynamically writing Java code on the driver that it then compiles with Janino before sending it over the wire to the executors.

Let's take this Scala code on the Spark CLI:

val ds = spark.createDataFrame(List(("a", 1), ("b", 2), ("c", 4)))
ds.writeTo("spark_file_test_writeTo").create()
spark.sqlContext.sql("update spark_file_test_writeTo set _2=42")

Pretty simple but what goes on deep down is complex. First, Spark uses an Antlr lexer and a parser (a lexer tokenizes; a parser builds an AST) to turn that ugly SQL statement into a tree of Scala case classes. Then it creates the Java code in WholeStageCodegenExec (source). In this Java, you'll see a subclass of BufferedRowIterator that looks something like:

columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 64);
...
columnartorow_mutableStateArray_3[1].write(1, 42);

in a method called processNext. That 42 is Spark setting our value for a row. If we added a where clause in our SQL, you'd see the generated code branching. That is, the generated code can access all the other fields in a row. 

If you import an implicit, you can run debugCodeGen() on the CLI to see the code more easily. 


Friday, May 26, 2023

Spark and Iceberg

Here are some notes I took when playing with Apache Iceberg plugged into Spark. (Update: Iceberg was already supported by Google but is now supported by AWS's Athena for Apache Spark - see here).

I'm running Spark 3.3 and Iceberg 1.2.1 with:

./spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.2.1\
    --conf spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=/tmp/warehouse \
    --conf spark.sql.defaultCatalog=local

This does a few things.

First, notice that the version of iceberg-spark-runtime must be the same major/minor version of the Spark you're running against. The library will automatically be downloaded by Spark if you don't already have it.

Data is saved in the directory defined by spark.sql.catalog.local.warehouse. Inside will be 2 directories, data and metadata.

Spark code

This took me a few hours to get a good feel for since there are lots of thunks passed around and functionality only executed when lazy values are touched. This is just the gist of what's going on:

Code wise, the interesting functionality appears to be in RuleExecutor.execute as each Rule[_] is executed via apply. The interesting code in the Iceberg Spark extension is applyd here as they all extend Rule[LogicalPlan].

LogicalPlan is a type of QueryPlan. QueryPlan can be either logical or physical, the latter being an extension of SparkPlan. You can spot these leafs in an AST quite easily as the naming convention say they end with Exec. And it appears to be V2TableWriteExec where the driver hands control over to the executors.

The tree of LogicalPlans are traversed in CheckAnalysis.checkAnalysis. But it's the instantiation of a Dataset that where its logicalPlan references the lazy QueryExecution.commandExecuted causing it to invoke eagerlyExecuteCommands(analyzed)

A notable sub-type of LogicalPlan is Command. This represents "a non-query command to be executed by the system".

Since Spark 3, an interface is available that is "responsible for creating and initializing the actual data writer at executor side. Note that, the writer factory will be serialized and sent to executors, then the data writer will be created on executors and do the actual writing." [DataWriterFactory docs]

Finally, making sense of the AST was problematic when the flow disappeared into WholeStageCodeGen as Spark then rolls the tree up and converts it to JVM bytecode.

Iceberg code

All the Iceberg initialization happens in IcebergSparkSessionExtensions. This subverts Spark's usual functionality and injects Iceberg specific functionality. One of the things it can inject is an IcebergSparkSqlExtensionsParser that visits the AST tree as it parses a SQL String to create a LogicalPlan

Iceberg also provides its own implementation of DataWriterFactory so it can use its own Table implementation under the Spark covers that allows (for instance) its own configuration for TableScans.

It's Iceberg's SnapshotProducer.commit() that's been injected into the Spark machinery that creates the manifest files.


Sunday, May 7, 2023

The Joy of Sets?

The standard Scala Set violates basic algebra. Let's take two sets, x and y:

@ val x = Set(1, 2, 3) 
x: Set[Int] = Set(1, 2, 3)

@ val y = Set(3, 2, 1) 
y: Set[Int] = Set(3, 2, 1)

@ x == y 
res2: Boolean = true

If they're equal, we should be able to substitute one for another. Let's see how that goes:

@ def giveHead(s: Set[Int]): Int = s.head 
defined function giveHead

@ giveHead(x) 
res6: Int = 1

@ giveHead(y) 
res7: Int = 3

So, I get different results depending on which of two equal objects I call the method - crazy. 

topkek
Isn’t Map.toList impure or something like that?
tpolecat
It's pure but non-congruent with equality. m1 === m2 doesn't imply m1.toList === m2.toList because they could differ in their iteration order. That's why it's not Foldable. You can get those instances from alleycats I believe.
The unordered business is there to guarantee that Eq and Foldable are congruent (i.e., that a === b implies a.toList === b.toList) which is not necessarily true of data types like Set and Map: these are only UnorderedFoldable which requires you to fold into a commutative monoid which destroys any information you might get from observing iteration order. Which is to say "unordered" doesn't necessarily say anything about execution order, it just says any ordering that might be there won't be observable. 
Although often forgotten, sets are not functors.

You cannot parTraverse a Set (or Maps) in Cats. It just won't compile. This is a property of that particular data structure but it's not limited to Sets:
Fabio Labella @SystemFw Nov 16 16:55
You cannot traverse an [FS2] Stream, because it implies it's finite. Look at evalMap instead if the use case is "evaluate an action on each element". 
Instead, you must parUnorderedTraverse a Set.
Adam Rosien @arosien Nov 10 20:17
every Traverse is also an UnorderedTraverse, so they should act the same in that case (and since every CommutativeApplicative is also an Applicative, you can still call unorderedTraverse on something that has a Traverse)
To traverse a set (albeit unordered), the UnorderedTraverse will put the Set into a higher kinded type G[_]. If we want to take advantage of this say for List[_], we need a concrete implementation of a Parallel[List]. Among other things, this defines how to map from List[_] to Set[_] and back again. This brings us back to the problems with set's toList function - but at least now we're explicit in how we want this handled.