Spark can process more data than it can fit into memory. So why does it sometimes fail with OutOfMemoryExceptions when joining unskewed data sets?
An interesting way to counter OOMEs in a large join is here [SO] where rows are given a random integer seed that is used in addition to the usual condition. In theory, this breaks down the data into more manageable chunks.
Another standard exercise is to repartition the data. But this causes a shuffle and it may actually be the repartition itself that causes of an OOME.
In practice, I've found persisting the data frame to disk and reading it back yields better results. The number of partitions being written is rarely the number that is read back. That is, you get a more natural partition for free (or almost free. Obviously, some time is taken in writing to disk). And there is no repartition that could throw an OOME.
This question came up on Discord where somebody is trying to crossJoin a huge amount of data. I suggested a solution that uses mapPartitions. The nice thing about this method is that your code is passed a lazy data structure. As long as you don't try to call something like toList on it, it will pull data into memory as needed and garbage collect it after it's written out.
By using a lazy Iterator, Spark can write far more memory than it has to disk. As Spark consumes from the Iterator, it measures its memory. When it starts looking a bit full, it flushes to disk. Here is the memory usage of this code that uses mapPartitions to write to /tmp/results_parquet a data set that is much larger than the JVMs heap:
Spark with 0.5gb heap writing 1.3gb files |
watch "du -sh /tmp/results_parquet"
we can see that upon each GC, more is written to disk.
The result is a huge dataframe that could not fit into memory can now be joined with another.
As an aside: Uber has been doing some work on dealing with OOMEs in Spark. See their article here. TL;DR; they're proposing that in the event of an OOME, Spark adapts and increases the memory to CPU ratio by asking come cores to step down before it re-attempts the failed stage. Ergo, each compute unit has more memory than before.
No comments:
Post a Comment