Wednesday, May 31, 2023

Spark Catalysts

I've been trying to see how Spark works under the covers. The TL;DR is that it services your queries by dynamically writing Java code on the driver that it then compiles with Janino before sending it over the wire to the executors.

Let's take this Scala code on the Spark CLI:

val ds = spark.createDataFrame(List(("a", 1), ("b", 2), ("c", 4)))
ds.writeTo("spark_file_test_writeTo").create()
spark.sqlContext.sql("update spark_file_test_writeTo set _2=42")

Pretty simple but what goes on deep down is complex. First, Spark uses an Antlr lexer and a parser (a lexer tokenizes; a parser builds an AST) to turn that ugly SQL statement into a tree of Scala case classes. Then it creates the Java code in WholeStageCodegenExec (source). In this Java, you'll see a subclass of BufferedRowIterator that looks something like:

columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 64);
...
columnartorow_mutableStateArray_3[1].write(1, 42);

in a method called processNext. That 42 is Spark setting our value for a row. If we added a where clause in our SQL, you'd see the generated code branching. That is, the generated code can access all the other fields in a row. 

If you import an implicit, you can run debugCodeGen() on the CLI to see the code more easily. 


No comments:

Post a Comment