We were trying to join two rather large Dataframes in Spark but kept getting horrible memory problems. Using my trusty jstat, I noticed that the driver was using all its heap.
Trying to find what the problem was, I executed:
val counted = csvAccount.groupBy('ID_ACCOUNT).agg(count("*").as("co")).select(max('co))
but with little success.
Looking at the query plan, I saw a broadcast join. What is this? “With a broadcast join one side of the join equation is being materialized and send to all mappers" (from here). Note that all the data "needs to fit into the memory of the Driver!” (ibid).
Apparently, you can disable this functionality with:
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
“Broadcast joins don't work well for cartesian products because the workers get so much broadcast data they get stuck in an infinite garbage collection loop and never finish. Remember to turn this back on when the query finishes” (from here).
This certainly sounds like what I was seeing.
As it happens, the problem was dirty data - specifically the keys on which we were joining were being truncated (due to bad CSV parsing) and so there were far more Rows for a given 'key' than there should be. If there were M dupes in one Dataframe and N in the other, there were MxN Rows for a given 'key'. It's easy to see how this Cartesian product then explodes and consumes far too much memory.