Spark is memory hungry and that means you need to consider the garbage collector when tuning.
There are a number of GCs to chose from that may or may not be appropriate for your job. Some say that G1 may give you (soft) guarantees for maximum latency time but are not good for batch jobs. Others point out that CMS may give lower pauses than ParallelGC (the default on 64-bit machines) but that ParallelGC has better overall throughput and since we're running a batch job not a web server, ParallelGC may suit Spark.
More logs than a lumberjack
Whichever GC you use, you'll need to turn logging on and analyse the logs.
You can see objects being promoted from young to old generation (see here and here) but it's inferred rather than stated explicitly. At a young GC, you'll see something like:
[PSYoungGen: BEFORE_YOUNG->AFTER_YOUNG(CAPACITY_YOUNG)] BEFORE_HEAP->AFTER_HEAP(CAPACITY_HEAP)
Which describes the memory usage in the young generation before GC (BEFORE_YOUNG), after (AFTER_YOUNG) and the capacity of the young generation.
It also shows the usage of the heap as a whole before (BEFORE_HEAP), after (AFTER_HEAP) and the heap's total capacity.
Note that it is not necessarily the case that:
Δ = (BEFORE_YOUNG-AFTER_YOUNG) - (BEFORE_HEAP-AFTER_HEAP) != 0
This is because not all of the young generation was GCed. Some was promoted to the old generation.
In an attempt to avoid premature promotion, and with JVMs that had 8gb of memory, I set -XX:MaxNewSize=6G -XX:SurvivorRatio=6 but instead of taking 5 hours, it took 6. I tried this because there was a lot of new generation GC and not a lot of old.
Trying G1GC took just over 6 hours. And setting -XX:MaxNewSize=1G -XX:SurvivorRatio=3 just had to be killed as it looked like it was going to take days. So, I had to look elsewhere.
Disks
You can also check your disk access times by using a script taken from here, namely:
dd if=/dev/zero of=/tmp/output.img bs=8k count=256k
rm /tmp/output.img
A decent HDD will typically give you about 300MB/s, a bog-standard SSD (my NUC at home) about 500MB/s.
The best way to avoid Garbage Collection...
... is not to create any garbage. This might not always be possible but you can minimize GC. Kryo will compress objects amazingly well by replacing the verbose String that represents their FQNs with just a byte or two. However, it will only compress components of the class you register with it, not a deep tree of components. For instance, if you register org.apache.spark.mllib.linalg.SparseVector, you'd do well to also register Array[Int] and Array[Double] as these are what hog memory.
It is not sufficient to just register Array[_].
You can put something like sparConf.set("spark.kryo.registrationRequired", "true")in your code to see what needs to be explicitly added. This will cause runtime exceptions to be thrown until everything to be (un)compressed has been explicitly registered.
After these changes, the shuffle went from some 2.3TB to 300GB and consequently the time for the job dropped from about 5 hours to less than 45 minutes. With less memory to churn there was less GC and this makes a huge difference to how long your Spark job takes.
No comments:
Post a Comment