Wednesday, February 9, 2022

Mini Driver

You can easily blow up the driver in Spark by calling collect() on a particularly large dataset. But is there any other way to do so?

This was the problem facing me this week: perfectly sensible looking code that resulted in:

Job aborted due to stage failure: Total size of serialized results of 4607 tasks (1024.1 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)

Well, the answer is, yes, innocuous joins can overwhelm the driver if they're broadcast as they are distributed to the executors via the driver.

"This is due to a limitation with Spark’s size estimator. If the estimated size of one of the DataFrames is less than the autoBroadcastJoinThreshold, Spark may use BroadcastHashJoin to perform the join. If the available nodes do not have enough resources to accommodate the broadcast DataFrame, your job fails due to an out of memory error." [Databricks]

The solutions is to call explain() and "review the physical plan. If the broadcast join returns BuildLeft, cache the left side table. If the broadcast join returns BuildRight, cache the right side table."

This can be achieved by calling the Dataset.hint(...) method with the values you can find in JoinStrategyHint.hintAliases.

No comments:

Post a Comment