Implementing your own distributed Pregel algorithm in GraphX is surprisingly simple but there are a few things to know that will help you.
First, what is GraphX's Pregel implementation? Well, it takes three functions:
1. one that merges messages of type T, that is (T, T) => T.
2. one that runs on each vertex with an attribute type of VD and that takes that message and creates a new state. Its type is (VertexId, VD, T) => VD.
3. one that runs on each edge/vertex/vertex combination and produces messages to be sent to the vertices. Its type is (EdgeTriplet[VD, ED]) => Iterator[(VertexId, VD)] where ED is the edge's attribute type.
TL;DR: the vertex holds its state and the edges send it messages. If this starts sounding like the Actor Model pattern to you, you're not alone
"It's a bit similar to the actor mode if you think of each vertex as an actor, except that vertex state and messages between vertices are fault tolerant and durable, and communication proceeds in fixed rounds: at every iteration the framework delivers all messages sent in the previous iteration. Actors normally have no such timing guarantee." - Martin Kleppmann.
So, off I went and wrote my code where all the paths through a vertex are stored as state at that vertex.
And it ran like a dog. After five hours of processing, it died with out of memory exceptions.
Judicious jstack-ing the JVMs in the cluster showed that threads were hanging around in Seq.containsSlice (we're using Scala 2.10.6). This Scala method was being used to find sub-sequences of VertexIds (which are just an alias for Longs) in the paths that had already been seen.
Desperate, I turned the Seq of Longs to Strings and then used String.contains and the whole thing ran in less than 25 minutes.
This is not the first time I've seen innocent looking code bring a cluster to its knees. Curious, I wrote some micro-benchmarking code comparing these two methods using JMH and got this:
Benchmark Mode Cnt Score Error Units
ContainsSliceBenchmark.scalaContainsSlice thrpt 5 594519.717 ± 33463.038 ops/s
ContainsSliceBenchmark.stringContains thrpt 5 307051356.098 ± 44642542.440 ops/s
That's three orders of magnitude slower.
Although it's a hack, this approach gives great performance. And it shows that taking jstack snapshots of your cluster is the best way to understand why your big data application is so slow.