Tuesday, May 30, 2017

Rocha Thatte Cycle Detection Algorithm

Flows of money and ownership in a network of businesses and individuals can indicate fraudulent behaviour. For instance, if there is a cycle in the network such that X owns Y who owns Z and Z audits X, you can quickly see that there is a conflict of interest. Such suspicions are of interest to us.

GraphX is very good at sniffing out these networks but you don't get cycle detection out-of-the-box. So, I rolled-my-own that happened to be similar to an algorithm somebody else has already discovered, the Rocha Thatte algorithm.

The algorithm

The Wikipedia page gives an excellent overview so I won't bore you with the details. Suffice to say that each vertex passes all the new paths going through it to its neighbouring vertex at each super-step.

The code is quite simple since GraphX does all the heavy lifting. Let's introduce a few type aliases:

import org.apache.spark.graphx.{VertexId, EdgeTriplet}

  type VertexPrg[T]   = (VertexId, T, T) => T
  type EdgePrg[T, ED] = (EdgeTriplet[T, ED]) => Iterator[(VertexId, T)]

No matter which algorithm we create, they have to implement these functions.

Now, we'll introduce some-domain specific aliases:

  type Path[T]  = Seq[T]
  type Paths[T] = Set[Path[T]]

and finally, the GraphX merge function (which is just (U,U)=>U) for us would look like:

  type MergeFn[T] = (Paths[T], Paths[T]) => Paths[T]

then the implementation of Rocha Thatte looks like this. The 'program' that runs on the vertex can be created here:

def vertexPrg[T](merge: MergeFn[T]): VertexPrg[Paths[T]] = { case (myId, myAttr, message) =>
  merge(myAttr, message)

and the 'program' running on edges looks like this:

type AddStepFn[T, ED] = (EdgeTriplet[Paths[T], ED]) = Paths[T]
def edgePrg[T, ED](add: AddStepFn[T, ED]): EdgePrg[Paths[T], ED] = { case edge =>
  import edge._
  val allPaths = add(edge)
  // TODO check attributes here for whatever business reasons you like
  if (allPaths == dstAttr) Iterator.empty else Iterator((dstId, allPaths))

(note: I've simplified the implementation for illustrative purposes. This code performs no checks.)

The problem

The shape of the graph is important. For instance, I ran this algorithm on a (disconnected) network with about 200 million edges, 400 million vertices and sub-graphs with a maximum diameter of 6. It ran in about 20 minutes on a cluster with 19 beefy boxes.

However, I ran it on a much smaller (connected) network of 26 thousand vertices, 175 thousand edges and a diameter of 10 with little success. I found that I could manage only 3 iterations before Spark executors started to die with (apparently) memory problems.

The problem was that this graph had regions that were highly interconnected (it actually represented all the business entities we had that were related to BlackRock Investment Management and Merrill Lynch, of which there are many). Let's say that a particular vertex has 100 immediate neighbours each with 100 of their own and each of them had 100. This quickly explodes into many possible paths through the original vertex (about 1 million) after only 3 super-steps.

It's not too surprising that this is an issue for us. After all, Spark's ScalaDocs do say "ideally the size of [the merged messages] should not increase."

For such a dense graph, super nodes are almost inevitable. For our purposes, we could ignore them but YMMV depending on your business requirements.

Monday, May 29, 2017

Good Hash

Since the feature hashing code that comes with Spark is based on a 32-bit hash giving us lots of collisions, we went for a 64-bit implementation. But what makes a good hash algorithm?

Rather than roll my own, I used this code. But how do I make sure it's good? This Stack Exchange answer gave me some clues by using the chi-squared test. Here, we basically compare with some fairly simple maths, our expected answer to our actual answer to indicate whether there is an unusually high number of collisions that our algorithm generates.

"The chi-squared test of goodness of fit is used to test the hypothesis that the distribution of a categorical variable within a population follows a specific pattern of proportions, while the alternative hypothesis is that the distribution of the variable follows some other pattern." [1]

The Chi-Square Test

"The values of column and rows totals are called marginals because they are on the margin of the table... The numbers within the table ... are called joint frequencies...

"If the two variables are not related, we would expect that the frequency of each cell would be the product of its marginals, divided by the sample size." [1]

This is because the probabilities of A and B are:

P(A ∩ B) = P(A) P(B) iff A ⊥ B

In other words, given a sample size of N, the probability of A is

P(A) = NA / N


P(B) = NB / N


P(A ∩ B) = NB NA / N2

so the expected frequency would be

N P(A ∩ B) = NB NA / N

exactly as [1] says.

The Chi-Square distribution then looks like this:

χ = Σi,j=1 (Oij - Eij)2/ Eij

where i and j are being summed over all the rows and columns.

We can then perform the chi-squared test. This involves looking up the critical value in a table (or calculating it) from the degrees of freedom and the observed chi-squared value. "Basically, if the chi-square you calculated was bigger than the critical value in the table, then the data did not fit the model, which means you have to reject the null hypothesis." (from here)

The Results

Using this new hashing algorithm for our feature hashing resulted in no collisions while hashing all words in the corpus of text where previously we were getting about 70 million collisions with the 32-bit version.

Consequently, the number of records we then had to compare dropped by about 20 million (or about 2% of the total).

Unfortunately, although I am able to execute a chi-square test on the 32-bit algorithm, the 64-bit algorithm has such a large possible number of hash values, it's not practically possible to test it in such a manner.

[1] Statistics in a Nutshell, Sarah Boslaugh

Friday, May 26, 2017

The Probability Monad (part 1)

This is an interesting idea: probability distributions can be modeled as monads. The canonical description lives here but it's very Haskell-heavy. So, in an attempt to grok the probability monad, you might like to look at a Scala implementation here.

[Aside. I tried looking at the Haskell code but had to run
cabal install --dependencies-only --enable-tests
see here for more information.]

The monad in this Scala library is Distribution[T] where T can be, say, a Double such as in a Gaussian distribution:

  object normal extends Distribution[Double] {
    override def get = rand.nextGaussian()

It could be something more interesting, for instance, here the Distribution monad in this particular case is parameterized with a List[Int].

   * You roll a 6-sided die and keep a running sum. What is the probability the
   * sum reaches exactly 30?
  def dieSum(rolls: Int): Distribution[List[Int]] = {
    always(List(0)).markov(rolls)(runningSum => for {
      d <- die
    } yield (d + runningSum.head) :: runningSum)
  def runDieSum = dieSum(30).pr(_ contains 30)

Simulation, simulation, simulation

The method pr will create a simulation where we sample an arbitrary number of monads (default of 10 000). We then filter them for those that contain a score of exactly 30 and calculate the subsequent probability.

Filtering the monads means that traversing the list of 10 000 and calling filter on each one to find ones with a score of 30. Each monad in the list is actually a recursive structure 30 deep (the number of  rolls of the dice; any more is pointless as the total will necessarily be greater than 30).

That's the high-level description. Let's drill down.

State monads again

This recursive structure is a state monad. The monads are created by the recursive calls to markov(). This method creates a new monad by calling flatMap on itself. The get method of this new, inner monad takes the value of its outer monad, passes it to the function that flatMap takes as an argument and in turn calls get on the result.

Having created this inner monad, markov() is called on it and we start the next level of recursion until we have done so 30 times. It is this chain of get calling get when the time comes that will build up the state.

Consequently, we have the outermost monad being a constant Distribution that holds List(0). This is what a call to the outermost get will return. However, get is not publicly accessible. We can only indirectly access it by calling the monad functions.

In short, we have what is a little like a doubly-linked list. The outermost monad contains the "seed", List(0), and a reference to the next monad. The inner monads contain a reference to the next monad (if there is one) and a reference to its outer monad's value via get.

Note that it is the innermost monad that is passed back to the call site calling dieSum, in effect turning the structure inside out.

Anyway, the next job is to filter the structure. This creates a new monad (referencing the erstwhile innermost monad) to do the job but remember monads are lazy so nothing happens yet. It's only when we call a sample method on this monad that something starts to happen. At this point, get is called and we work our way up the get-chain until we reach the outermost monad that contains List(0). Then we "pop" each monad, executing the runningSum => function on the results of the monad before. This is where we roll the die and add append the cumulative result to the List.

If the given of the filter monad is not met, then we keep trying the whole thing again until it is.

Finally, we count the results that meet our predicate dividing by total number of runs. Evidently, we've taken a frequentist approach to probabilities here.

Sunday, May 21, 2017

Lazy Scala

This might be elementary but when a simple error trips me more than once, I blog it.

You probably know that an Iterator can only be accessed once as it's stateful. "An Iterator can only be used once because it is a traversal pointer into a collection, and not a collection in itself" from here. (Iterable just means the implementing class can generate an Iterator).

The simple mistake I made was to check the size of the iterator in a temporary log statement before mapping over it. What was a little more interesting was that other collections behave the same way if you call them via their TraversableOnce interface.

To demonstrate, say we have a Set

    val aSet = Set(1, 2, 3)

and two functions that are identical other than the type of their argument:

  def mapSet[T](xs: Set[T]): TraversableOnce[String] = {
    val mapped = xs.map(_.toString)

  def mapTraversableOnce[T](xs: TraversableOnce[T]): TraversableOnce[String] = {
    val mapped = xs.map(_.toString)

then mapTraversableOnce will return an empty iterator while mapSet will return a Set of Strings. This will come as a surprise to anybody expecting Object Oriented behaviour.

Iterator also violates the functor laws. Take two otherwise identical methods:

  def isTraversableFunctor[T, U, V](xs: Traversable[T], f: T => U, g: U => V): Boolean = {
    val lhs = xs.map(f).map(g)
    val rhs = xs.map(g compose f)
    (lhs == rhs)

  def isTraversableOnceFunctor[T, U, V](xs: TraversableOnce[T], f: T => U, g: U => V): Boolean = {
    val lhs = xs.map(f).map(g)
    val rhs = xs.map(g compose f)
    (lhs == rhs)

and pass them aSet. The first will say it's a functor, the second says it's not.

This is somewhat trivial as the reason it fails is that the iterator has not been materialized. "TraversableOnce's map is not a functor map, but, then again, it never pretended to be one. It's design goals specifically precluded it from being a functor map" from Daniel C. Sobral.


Iterator comes into its own when we want access to the underlying elements lazily. But there are other ways to do this like Stream "which implements all its transformer methods lazily."

Almost all collections are eager by default. Use the view method to make them lazy. Use force on these to make them eager again.

Finally, some methods are naturally lazy. For instance, exists terminates quickly if it finds what it's looking for.

Tuesday, May 16, 2017

Playing with Pregel

Implementing your own distributed Pregel algorithm in GraphX is surprisingly simple but there are a few things to know that will help you.

First, what is GraphX's Pregel implementation? Well, it takes three functions:

1. one that merges messages of type T, that is (T, T) => T.

2. one that runs on each vertex with an attribute type of VD and that takes that message and creates a new state. Its type is (VertexId, VD, T) => VD.

3. one that runs on each edge/vertex/vertex combination and produces messages to be sent to the vertices. Its type is (EdgeTriplet[VD, ED]) => Iterator[(VertexId, VD)] where ED is the edge's attribute type.

TL;DR: the vertex holds its state and the edges send it messages. If this starts sounding like the Actor Model pattern to you, you're not alone

"It's a bit similar to the actor mode if you think of each vertex as an actor, except that vertex state and messages between vertices are fault tolerant and durable, and communication proceeds in fixed rounds: at every iteration the framework delivers all messages sent in the previous iteration. Actors normally have no such timing guarantee." - Martin Kleppmann.

So, off I went and wrote my code where all the paths through a vertex are stored as state at that vertex.

And it ran like a dog. After five hours of processing, it died with out of memory exceptions.

Judicious jstack-ing the JVMs in the cluster showed that threads were hanging around in Seq.containsSlice (we're using Scala 2.10.6). This Scala method was being used to find sub-sequences of VertexIds (which are just an alias for Longs) in the paths that had already been seen.

Desperate, I turned the Seq of Longs to Strings and then used String.contains and the whole thing ran in less than 25 minutes.

This is not the first time I've seen innocent looking code bring a cluster to its knees. Curious, I wrote some micro-benchmarking code comparing these two methods using JMH and got this:

     Benchmark                                   Mode  Cnt          Score          Error  Units
     ContainsSliceBenchmark.scalaContainsSlice  thrpt    5     594519.717 ±    33463.038  ops/s
     ContainsSliceBenchmark.stringContains      thrpt    5  307051356.098 ± 44642542.440  ops/s

That's three orders of magnitude slower.

Although it's a hack, this approach gives great performance. And it shows that taking jstack snapshots of your cluster is the best way to understand why your big data application is so slow.

Friday, May 5, 2017

Spark OutOfMemoryErrors.

Spark can deal with data sets that are much larger than the available physical memory. So, you never have problems with OutOfMemoryErrors, right? Well, no. It depends what you're doing.

When asked how to avoid OOMEs, the first thing people generally say is increase the number of partitions so that there is never too much data in memory at one time. However, this doesn't always work and you're better off having a look at the root cause.

This very week was one of those times. I was getting FetchFailedException. "This error is almost guaranteed to be caused by memory issues on your executors" (from here). Increasing the number of partitions didn't help.

The problem was that I had to find similar entities and declare there was a relationship between them. Foolishly, I tried to do a Cartesian product of all entities that were similar. This is fine 99.99% of the time but scales very badly - O(n2) - for outlying data points where n is large.

The symptoms were that everything was running tickety-boo until the very end where the last few partitions were taking tens of minutes and then the fatal FetchFailedException was thrown.

These pairs were going to be passed to GraphX's Connected Components algorithm so instead of creating a complete graph (which is what the Cartesian product of the entities would give you) you need to a different graph topology. This is not as simple as it sounds as you have to be a little clever in choosing an alternative. For instance, a graph that is just a line will take ages for GraphX to process if it is sufficiently large. For me, my driver ran out of memory after tens of thousands of Pregel supersteps.

A more efficient approach is a star topology that has branches of length 1. This graph is very quickly explored by Pregel/ConnectedComponents and has the same effect as a complete graph or a line since all vertices are part of the same connected component and we don't care at this point how they got there.

The result was the whole graph was processed in about 20 minutes.