Sunday, March 13, 2016

GraphX and the Pregel algorithm

GraphX is a Spark module that allows me to find (typically) small sub-graphs in a huge collection of vertices. I've only got a prototype running but I can attest that it is indeed very fast.

GraphX does this by using the Pregel Algorithm. This is named after the river Pregel on which the Seven Bridges of Konigsberg problem was based. The problem asks: can you cross all seven bridges in the city of Konigsberg once and only once? You can't and the proof was the start of Graph Theory.


GraphX's representation of graphs has both an RDD of edges as well as vertices.

The VertexRDD is just an RDD of tuples of a vertex ID and an attribute.

An EdgeRDD is just an RDD of Edges where an edge is just a triple of source, destination and attribute.

What these vertex and edge attributes are is totally arbitrary.


The Pregel algorithm was inspired by a model that looks a lot like how a multi-core processor works, with each core doing its computations then communicating with the others by the bus.

(This particular abstract model of a computer is called Bulk Synchronous Parallel which in turn is like Parallel Random Access Machine, a shared-memory abstract machine.)

In our analogy, each processor core is represented by a Spark partition and each message on the bus is delivered by a Spark join between RDDs.

I thought the simplest explanation was the second at this link by Shagun Sodhani. Basically, we repeatedly perform iterations over all vertices (a superstep) that executes a user-defined function on each vertex. This function can send and receive messages to other vertices and/or mutate its own vertex.

Spark's Implementation

Spark modifies this slightly. "Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over edges, enables the message sending computation to read both vertex attributes, and constrains messages to the graph structure." (from the docs)

You invoke all this magic with a simple call to connectedComponents() on the graph. As its name suggests, this will return all the connected components.

(This is not to be confused with strongly connected components where all component members can reach each other but not necessarily other members of the same directed graph. For  this problem you might use the linear-time Kosaraju's algorithm).

The best description of how Spark implements Pregel I found here:

"It's fairly simple - each node just broadcasts its VertexId to its neighbors, and its neighbors do the same. Any node that receives a VertexId lower than its own broadcasts that the next round; if not the Vertex goes silent" and does not participate in the algorithm any more.


Before Spark calls its Pregel implementation it must do four things.

First, it points all vertices to themselves. That is, each vertex's attribute is itself - that is a Long.

    val ccGraph = graph.mapVertices { case (vid, _) => vid }

Secondly, it defines the function to run on all vertices upon receiving a message. In our case, that's simply picking the minimum value of the vertex's attribute and the incoming message. This is the user-defined vertex program that the Pregel definition mentions.

Then it defines the function to be executed at each edge. The output on which vertex has the lower attribute. If it's the source vertex attribute, then the output is a mapping from the destination vertex ID to source vertex's attribute. If it's the destination vertex attribute, then the output it's a mapping of the source vertex ID to the destination's attribute. These are the Pregel messages that will be sent to different nodes. The key is the receiving node's ID and the value is the message payload. We call this function the map function.

Finally, it defines a function that given two vertex attributes will pick the minimum. We call this the reduce function.


The Pregel implementation repeatedly runs over the graph running the given map function on each edge generating messages that will then be reduced using the given reduce function. For our use case, this boils down to mapping each edge to a [node ID, attribute]-tuple and then reducing all the messages for each recipient node to a message that has the minimum attribute.

We join these messages with the vertices for whom they're destined (as we would join any RDDs) running our user-defined vertex program on the result. Now, all our ducks are lined up for another iteration generating yet more new messages.

Only when we have no more messages do we halt the iterations.

No comments:

Post a Comment