Monday, December 17, 2018

Spark, Parquet, Stats


If you're using Spark to write data to HDFS, you might want to give a thought to how it is going to be read. How you write a Dataframe can make a huge difference to the efficiency of how we read.

This test creates a Dataframe with, let's say, columns X, Y and Z. We sort by X, partition by Y then save it.

This results in |Y| directories under our root (one for each Y-value) and at least as many parquet files as a Cartesian product of X and Y up to a maximum of 400. I'm not currently sure what this maximum comes from (it appears to come from spark.sql.shuffle.partitions * the number of partitions).

Only when we exceed this 400 number do we start finding multiple values of X in a single Parquet file.

Interestingly, although for our test, 6 constituent Parquet files are created (=|X| * |Y|= 3*2) when we save our Dataframe, the Dataframe that comes from reading the data back from HDFS has 4 partitions. That is, a partitions straddles more than one parquet file.

Predicate Pushdown

The reason we sort before we save is so each parquet file has a chance of containing only one (or at worst a small subset of) X. This makes searching much easier as Parquet might advertise the range for a column it has inside it. If your sought value is not in that range, the file is not searched.

To demonstrate this, we take a Dataframe that's sorted by "intKey" (X) and we partitionBy "partitionkey" (Y).

scala> val query = dataframe.where(s"$intKey == 3")
scala> query.explain()

== Physical Plan ==
*(1) Project [id#21, text#22, intkey#23, partitionkey#24]
+- *(1) Filter (isnotnull(intkey#23) && (intkey#23 = 3))
   +- *(1) FileScan parquet [id#21,text#22,intkey#23,partitionkey#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://127.0.0.1:41684/1544107487759], PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(intkey), EqualTo(intkey,3)], ReadSchema: struct

All good but note this caveat from CERN: "this feature has limitations, notably, it can only be used with certain data types and operators as implemented in Parquet and Spark. [Also] the results depend on the provided filter values and data distribution in the source table."

(If we were to search on partitionkey, then this would be an example of partition pruning).

Parquet tools


I did try to use the Hadoop Parquet tools but I could not find the min/max values of the Statistics object even if I set parquet.strings.signed-min-max.enabled as true in the config for my ParquetFileReader.

Not to worry, Spark can show us these statistics using dataframe.describe (StackOverflow). But, instead of loading the whole Dataframe, we instead just load an isolated parquet file via the usual means. When we do, we see something like:

+-------+-----------------+--------------------+------+
|summary|               id|                text|intkey|
+-------+-----------------+--------------------+------+
|  count|             1666|                1666|  1666|
|   mean|           5001.0|            Infinity|   0.0|
| stddev|2886.462540896729|                 NaN|   0.0|
|    min|                6|65656565656565656...|     0|
|    max|             9996|89898989898989898...|     0|
+-------+-----------------+--------------------+------+

(Note the stats for the text field).

Now we can see that for this particular Parquet file (that is just one piece of the whole Dataframe we started with) that the intkey field has a min/max of 0/0. So, Spark knows that if it is looking for intkey=3, not to bother even reading this file.

Buckets

Note that you can't call bucketBy when saveing  a data frame. Only on saveAsTable can you do this (see a previous post on this). You can then call analyze table on it.

No comments:

Post a Comment