Monday, September 18, 2017

Spark Partitions

Some miscellaneous Spark/GraphX partitioning notes that I have made.

Co-partitioned and co-located are not the same thing

"Even if the same partitioner is used for an RDD that needs to be partitioned within the current job as was used to partition another RDD that is needed in the current job but whose partitioning was persisted in a prior job, then the RDDs will still be co-partitioned but won't necessarily be co-located."

"There are two things here. One is data co-partition, and then data co-location.

"As long as you specify the same hash partitioner, a1 and b1 will be co-partitioned (i.e. partitioned the same way). The partitions might not be co-located, so your job will still incur network traffic, but you do reduce a whole round of shuffle.

"If a1 and b1 are put into memory by different jobs, there is no guarantee their partitions will be co-located, even though they are co-partitioned.

"What if you add a node to the cluster, or a node dies? Or what if a node is busy and can never be run anything - are you going to block the entire program forever so you could put a partition on that node?

"You can still write your program in a way that does mostly local joins. And even if the joins need to fetch blocks from remote nodes, the joins still avoid an extra round of shuffle, which is much more expensive than just fetching data."

From a Google Groups discussion (here).


Partitioning graphs present another interesting challenge. Edge cuts (ie, partitioning vertices) are a viable but generally inefficient way to partition a graph.

Most graphs obey a power law, for example something like Zipf's law if you're processing natural language. ("Zipf's law states that given some corpus of natural language utterances, the frequency of any word is inversely proportional to its rank in the frequency table." from Wikipedia). Consequently, the partitioning of vertices will be 'lumpy'.

GraphX allows you to chose your partitioning strategy. It offers these:

From "Spark GraphX in Action", p214.
You can find my experience of EdgePartition2D here.

"Introducing a .repartition will increase the amount of work that the Spark engine has to do, but the benefit can significantly outweigh that cost." (from here). In this example, the data was "lumpy" causing a single executor to do most of the work.

I've also seen repartitioning on a given field used to optimize joins. See my previous post where repartitiong a Dataframe by field X and writing it to a file was used to make joining on X more efficient.


"join has the same semantics as in sql join, e.g. every row after the join is (k, v1, v2), where v1 is from rdd1, and v2 is from rdd2.

cogroup just groups values of the same key together, and every row looks like (k, seq1, seq2), where seq1 contains all the values having the same key from rdd1."

(From another Google Groups discussion).

Note that Datasets have slightly different semantics. The outer-joins of RDDs can return RDDs that contain Options. Dataset's join returns another Dataset whose rows can be accessed as usual while its @experimental joinWith returns a Dataset[(T, U)]. Note that T and U can be null. No nice Options for us here.


"In Apache Hadoop, the grouping is done in two places - the partitioner, which routes map outputs to reduce tasks, and the grouping comparator, which groups data within a reduce task.  Both of these are pluggable per-job.  The sorting is pluggable by setting the output key comparator...

"The order of the values within a reduce function call, however, is typically unspecified and can vary between runs. Secondary sort is a technique that allows the MapReduce programmer to control the order that the values show up within a reduce function call." (from here). This can then help the next stage.

Sorting also helped us when writing Parquet files to HDFS.

An interesting trick to improve sort performance can be found here: "Sorting in general has good cache hit rate due to the sequential scan access pattern. Sorting a list of pointers, however, has a poor cache hit rate because each comparison operation requires dereferencing two pointers that point to randomly located records in memory. So how do we improve the cache locality of sorting? A very simple approach is to store the sort key of each record side by side with the pointer. For example, if the sort key is a 64-bit integer, then we use 128-bit (64-bit pointer and 64-bit key) to store each record in the pointers array."


You may see the DAG in the GUI not being as linear as you'd expect. It is Spark's prerogative to do this.  "Stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched" (from here).

No comments:

Post a Comment