Wednesday, November 2, 2016

Spark and Vector Multiplication

More Spark shenanigans. I'm having all sorts of problems while attempting to make the run stable. I'm seeing FetchFailedExceptions that (according to this) are "almost guaranteed to be caused by memory issues on your executors" and FileNotFoundExceptions that mysteriously appear when an executor throws an OutOfMemoryError in the initializer of org.apache.spark.shuffle.sort.ShuffleInMemorySorter.

Thanks for the memory

So, the first thing is to review how Spark manages its memory. The areas are broken down as:
  • Reserved Memory
  • User Memory
  • Spark Memory -broken down into:
    • Storage Memory
    • Executor Memory
where (from here)

Reserved Memory "is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB"

User Memory "is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations".

Storage Memory "is used for both storing Apache Spark cached data and for temporary space serialized". It also stores broadcast variables.

Execution Memory "is used for storing the objects required during the execution of Spark tasks."

Why heap space isn't the whole story

Here is an example.

Given "a matrix A that has a large number of columns (a short and fat matrix) and a matrix B that has a large number of rows (a tall and thin matrix)" that we want to multiply, then there's a good chance multiplying them may blow up.

The reason is that this can be very memory intensive (see the User Memory section above).

To get around this, we can "apply the outer product to each row of the RDD, a column of A with a row of B. This will create an RDD of small matrices size m*k." We then "use the reduce method to sum the all the small matrices."

Neat. So the trick is to break down each step into smaller steps. In this example, it's done by making an RDD that pairs small vectors together so they can be multiplied with very little overhead.

Refactor the problem

Realizing that my vector multiplication was probably the culprit, I gave myself more user memory but setting spark.memory.fraction much lower (about 0.01) since

User Memory ~ (1.0 - spark.memory.fraction)

This improved things insofar as my run ran for longer but it still ultimately blew up.

The problem was that my sparse matrix was not sparse enough. It could have rows with 2000 non-zero elements. Since the row could have about 100 million elements overall, this is still probably considered sparse. But each element is multiplied with each other element while I calculated Cosine Similarities. This scales as O(N2).

OK, ~20002 results is not the end of the world but what if we get a few of these in the same JVM? We quickly exhaust the memory. Increasing User Memory reduces the likelihood but does not eliminate it. The solution (like the matrix multiplication example above) is to break down the problem into a new RDD that presents the data in smaller chunks.

No comments:

Post a Comment