DIMSUM in an algorithm (described here) that efficiently finds similar columns in a matrix. You get it for free in Spark in the RowMatrix.columnSimilarities method.
The problem is basically taking each row and multiplying all combination of values together. Given a matrix with m rows, and n columns, this is O(mn2) problem. But if the matrix is sparse, with a maximum of L non-zero values per row, the problem becomes O(mL2) complex. DIMSUM promises to improve this efficiency even more (with a small loss in accuracy).
The headache for us was that our matrix was huge - 100 million by 100 million. This was pushing the envelope as it appears it is not best suited for this [see here for this comment: "Column based similarities work well if the columns are mild (10K, 100K, we actually scaled it to 1.5M columns but it really stress tests the shuffle and it needs to tune the shuffle parameters)"]
What we were seeing in the driver logs looked like this:
16/01/28 11:05:56 ERROR TaskSchedulerImpl: Lost executor 3 on ip-172-30-0-65.eu-west-1.compute.internal: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
16/01/28 11:05:56 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 44), so marking it as still running
16/01/28 11:05:56 WARN TaskSetManager: Lost task 72.0 in stage 7.0 (TID 6100, ip-172-30-0-65.eu-west-1.compute.internal): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
.
.
The Spark documentation on memory tuning was also a good read but didn't help. I was using Kryo for compressing objects; I reduced spark.memory.fraction to 25% from the 75% default to maximize the spave "reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records." All to no avail.
This StackOverflow post was informative. I was using Spark 1.6 so didn't need to worry about spark.storage.memoryFraction; I wasn't using broadcast variables; I was using thousands of partitions; and the "task serialized as XXX bytes" mentioned in the logs indicated that XXX was small.
So, I used about as much of the memory the box could give me (55gb out of 60gb - I was also running Hadoop and the kernel needs some memory too). And running jmap on the worker boxes gave an output like this:
num #instances #bytes class name
----------------------------------------------
1: 72 42400003008 [D
2: 20222 174418080 [B
3: 2070414 99379872 scala.Tuple2$mcID$sp
4: 403387 69434872 [C
5: 382592 36728832 io.netty.channel.ChannelOutboundBuffer$Entry
6: 769266 30770640 io.netty.util.Recycler$DefaultHandle
7: 245337 29440440 io.netty.buffer.PooledHeapByteBuf
Looking more closely at the code in Spark for calculating the column similarities, we see:
def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {
val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
updateNumRows(summary.count)
summary
}
(aggregator, data) => aggregator.add(data),
over each partition and the aggregate them altogether with:
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
what is this MultivariateOnlineSummarizer class? Well, it contains seven arrays of doubles. Hmm. And what's this in the add method (that is called on each row in a partition)?
private[spark] def add(instance: Vector, weight: Double): this.type = {
.
.
n = instance.size
currMean = Array.ofDim[Double](n)
currM2n = Array.ofDim[Double](n)
currM2 = Array.ofDim[Double](n)
currL1 = Array.ofDim[Double](n)
nnz = Array.ofDim[Double](n)
currMax = Array.fill[Double](n)(Double.MinValue)
currMin = Array.fill[Double](n)(Double.MaxValue)
.
.
My first attempt at a solution involved now reducing the number of partitions with RDD.coalesce. This is inexpensive as it just logically groups the 2000 partitions into 16 (I choose 16 as I have 4 boxes and each box could just about hold 4 MultivariateOnlineSummarizers in memory at once).
This helped the job to progress further but then Kryo was complaining that it didn't have enough buffer space.
16/02/02 13:52:28 WARN TaskSetManager: Lost task 3.3 in stage 4.0 (TID 4050, ip-172-30-0-64.eu-west-1.compute.internal): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 6, required: 8
Serialization trace:
currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer). To avoid this, increase spark.kryoserializer.buffer.max value.
sparkConf.setExecutorEnv("spark.kryoserializer.buffer.max", "20G")
16/02/02 14:05:23 WARN TaskSetManager: Lost task 1.0 in stage 4.1 (TID 6149, ip-172-30-0-65.eu-west-1.compute.internal): FetchFailed(BlockManagerId(0, ip-172-30-0-63.eu-west-1.compute.internal, 44256), shuffleId=2, mapId=7, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: Too large frame: 3251949143
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
.
.
Caused by: java.lang.IllegalArgumentException: Too large frame: 3251949143
at org.spark-project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:134)
.
.
Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);
(see here for more information on Spark limits). Indeed the app in the Spark GUI looks like this:
We can see a job that is in the process of retrying having failed already. The interesting things to note are that 16 out of 16 tasks of stage 3 have succeeded - that is the first part of treeAggregate. Our hack using coalesce worked! However, the second part of treeAggregate fails, that is none of the 4 tasks (corresponding to my 4 boxes trying to aggregate their individual results) in stage 4 passed.
Workaround
Well, there isn't one that I am aware of, at least as far as DIMSUM is concerned. However, since my matrix is very sparse, coding up a brute-force routine actually ran in about 45 minutes so maybe I don't need the efficiency DIMSUM promises.
No comments:
Post a Comment