Thursday, June 7, 2018

Spark Query Plans (Gotcha!)


I was exploring why Spark was taking hours to run a support vector machine on my data when other ML algorithms were taking minutes. I called .explain() on my DataFrame but I found surprisingly little information on the web concerning interpreting Spark query plans.

The best source I found was this at Simon Fraser University. The take-away points are:

  • "Read [query plans] from the bottom up."
  • Exchanges eg "Exchange hashpartitioning(..." are shuffles.
  • InMemoryRelation and InMemoryTableScan "will look in memory for data, calculating and caching if necessary". 
  • Range and Project are from select()s, withColumn() etc

Playing around, I found InMemoryRelation and InMemoryTableScan can be generated by adding a .cache().

scala> val df = sc.range(1, 100000).toDF
scala> df.explain()
== Physical Plan ==
*SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

scala> val add1 = df.map { r => r.getLong(0) - 1 }
scala> add1.explain()
== Physical Plan ==
*SerializeFromObject [input[0, bigint, false] AS value#10L]
+- *MapElements <function1>, obj#9: bigint
   +- *DeserializeToObject createexternalrow(value#2L, StructField(value,LongType,false)), obj#8: org.apache.spark.sql.Row
      +- *SerializeFromObject [input[0, bigint, false] AS value#2L]
         +- Scan ExternalRDDScan[obj#1L]

scala> val cachedAdd1 = add1.cache()
scala> cachedAdd1.explain()
== Physical Plan ==
InMemoryTableScan [value#10L]
   +- InMemoryRelation [value#10L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *SerializeFromObject [input[0, bigint, false] AS value#10L]
            +- *MapElements <function1>, obj#9: bigint
               +- *DeserializeToObject createexternalrow(value#2L, StructField(value,LongType,false)), obj#8: org.apache.spark.sql.Row
                  +- *SerializeFromObject [input[0, bigint, false] AS value#2L]
                     +- Scan ExternalRDDScan[obj#1L]

So, that explained the last two actions in my plan but I was no nearer understanding my problem. However, I did notice that there were lots of User Defined Functions in my DataFrame, presumably from Spark's Transformers that took my raw textual data and helped me turn them into vectors.

Could this be my problem? There appear to have been several problems with UDFs (see SPARK-17728 for example) and no shortage of people warning against them (for example here: "Using a UDF implies deserialization to process the data in classic Scala and then reserialize it. UDFs can be replaced by Spark SQL functions, there are already a lot of them and new ones are regularly added.")

But I could not replicate the issue by testing in the Spark shell:

scala> val df = sc.range(0, 5).toDF
scala> val noisyAdd1UDF = udf{ x: Long => println("noisyAdd1UDF = " + x); x + 1 } // not a pure function
scala> val dfWithUdf = df.withColumn("added", noisyAdd1UDF('value))
scala> dfWithUdf.explain()
== Physical Plan ==
*Project [value#283L, UDF(value#283L) AS added#287L]
+- *SerializeFromObject [input[0, bigint, false] AS value#283L]
   +- Scan ExternalRDDScan[obj#282L]

Now, let's show() it twice. Each time, the UDF is re-calculated:

scala> dfWithUdf.show()
noisyAdd1UDF = 0
...
noisyAdd1UDF = 4
+-----+-----+
|value|added|
+-----+-----+
|    0|    1|
...
|    4|    5|
+-----+-----+

scala> dfWithUdf.show()
noisyAdd1UDF = 0
...
noisyAdd1UDF = 2
+-----+-----+
|value|added|
+-----+-----+
|    0|    1|
...
|    4|    5|
+-----+-----+

So, no surprises. Calling cache() stopped subsequent show()s from calling the UDF again as expected.

The Gotcha

It turns out that the problem is not directly with a UDF but with the DataFrame that is created when we add the UDF. The new DataFrame does not inherit its parent's StorageLevel (note that other operations like randomSplit will do the same thing).

We can see this by doing:

scala> val df = sc.range(0, 100000).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]

scala> df.storageLevel
res67: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

scala> df.cache()
res68: df.type = [value: bigint]

scala> df.storageLevel
res69: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)

scala> val dfWithUdf = df.withColumn("value", noisyAdd1UDF('value))
dfWithUdf: org.apache.spark.sql.DataFrame = [value: bigint]

scala> dfWithUdf.storageLevel
res70: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

Now, while other Spark classifiers might also user withColumn, they discard the other columns that would call the UDF and thus result in the DataFrame being re-calculated. Whereas, OneVsRest does not do this. Indeed, it cannot as does not know whether those columns will be used by the classifiers it wraps.

And we can see this if we again look at the Query Plans. Our cached DataFrame will use InMemory plans:

scala> df.explain()
== Physical Plan ==
InMemoryTableScan [value#744L]
   +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
            +- Scan ExternalRDDScan[obj#743L]

but although it sits on top of this cacheDataFrame, our non-cacheDataFrame will have as its top layer plan, a Project that calls our UDF:

scala> dfWithUdf.explain()
== Physical Plan ==
*Project [UDF(value#744L) AS value#753L]
+- InMemoryTableScan [value#744L]
      +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
               +- Scan ExternalRDDScan[obj#743L]

In turn, cacheing this puts another layer of InMemoryXXX on top:

scala> dfWithUdf.explain()
== Physical Plan ==
InMemoryTableScan [value#753L]
   +- InMemoryRelation [value#753L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Project [UDF(value#744L) AS value#753L]
            +- InMemoryTableScan [value#744L]
                  +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
                           +- Scan ExternalRDDScan[obj#743L]

The solution is to use checkpoint(). This collapses the Query Plan, freezing the results and obviates the need to recalculate anything.

No comments:

Post a Comment