Thursday, December 6, 2018

More Spark Query Plans


The second part in discovering what is going on in query plans (the first is here). I imagine there will be more...

Predicate Push Down

Seeing PushedFilters in the DataFrame.explain output mean predicate push down is at work. For example:

*(1) Project [id#21, text#22, intkey#23, partitionkey#24]
+- *(1) Filter (isnotnull(intkey#23) && (intkey#23 = 3))
   +- *(1) FileScan parquet [id#21,text#22,intkey#23,partitionkey#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://127.0.0.1:41684/1544107487759], PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(intkey), EqualTo(intkey,3)], ReadSchema: struct

The code to generate this lives here in GitHub.

"Predicate push down is a feature of Spark and Parquet that can help increase the performance of your queries by reducing the amount of data read based on metadata stored in Parquet about the minimum and maximum values of stored in columns and aggregated per row group. However this feature has limitations, notably, it can only be used with certain data types and operators as implemented in Parquet and Spark. Moreover, even when filters are pushed down, the actual reduction of I/O and relative increase in performance vary: the results depend on the provided filter values and data distribution in the source table." (from an excellent blog at CERN).

Asterisks

"With whole-stage code generation, all the physical plan nodes in a plan tree work together to generate Java code in a single function for execution. This Java code is then turned into JVM bytecode using Janino, a fast Java compiler... Where ever you see *, it means that wholestagecodegen has generated hand written code prior to the aggregation. Exchange means the Shuffle Exchange between jobs.Exchange does not have whole-stage code generation because it is sending data across the network. " (StackOverflow)

HashAggregate

"When spark is doing dataframe operation, it does first compute partial counts for every partition and then having another stage to sum those up together." These need to be sent across the network triggering an Exchange.  (StackOverflow).

Note, you might see:

.
.
  +- *HashAggregate(COLUMN_FOR_GROUPBY#INDEX, 200
.
.

The 200 is the number of partitions and is defined by spark.sql.shuffle.partitions.

If you just try to set it with .repartition(NEW_NUMBER), it doesn't appear to change. However, .repartition(NEW_NUMBER, COLUMN) does seem to change it.
See the Mastering Spark SQL GitHub page.

Parenthesis

"Everything with the same index number is in one stage. So, stage boundaries can be recognized by exchange operations that involve a shuffle." [SO]

BatchScan

"BatchScan non empty filters shows that your predicates have been pushed down to the datasources by spark." [Iceberg Slack]. Note that it is not a given your predicate will be pushed down. If it's the result of a function, how would Spark know the predicate was true without running the function?

No comments:

Post a Comment