Tuesday, May 30, 2017

Rocha Thatte Cycle Detection Algorithm

Flows of money and ownership in a network of businesses and individuals can indicate fraudulent behaviour. For instance, if there is a cycle in the network such that X owns Y who owns Z and Z audits X, you can quickly see that there is a conflict of interest. Such suspicions are of interest to us.

GraphX is very good at sniffing out these networks but you don't get cycle detection out-of-the-box. So, I rolled-my-own that happened to be similar to an algorithm somebody else has already discovered, the Rocha Thatte algorithm.

The algorithm

The Wikipedia page gives an excellent overview so I won't bore you with the details. Suffice to say that each vertex passes all the new paths going through it to its neighbouring vertex at each super-step.

The code is quite simple since GraphX does all the heavy lifting. Let's introduce a few type aliases:

import org.apache.spark.graphx.{VertexId, EdgeTriplet}

  type VertexPrg[T]   = (VertexId, T, T) => T
  type EdgePrg[T, ED] = (EdgeTriplet[T, ED]) => Iterator[(VertexId, T)]

No matter which algorithm we create, they have to implement these functions.

Now, we'll introduce some-domain specific aliases:

  type Path[T]  = Seq[T]
  type Paths[T] = Set[Path[T]]

and finally, the GraphX merge function (which is just (U,U)=>U) for us would look like:

  type MergeFn[T] = (Paths[T], Paths[T]) => Paths[T]

then the implementation of Rocha Thatte looks like this. The 'program' that runs on the vertex can be created here:

def vertexPrg[T](merge: MergeFn[T]): VertexPrg[Paths[T]] = { case (myId, myAttr, message) =>
  merge(myAttr, message)

and the 'program' running on edges looks like this:

type AddStepFn[T, ED] = (EdgeTriplet[Paths[T], ED]) = Paths[T]
def edgePrg[T, ED](add: AddStepFn[T, ED]): EdgePrg[Paths[T], ED] = { case edge =>
  import edge._
  val allPaths = add(edge)
  // TODO check attributes here for whatever business reasons you like
  if (allPaths == dstAttr) Iterator.empty else Iterator((dstId, allPaths))

(note: I've simplified the implementation for illustrative purposes. This code performs no checks.)

The problem

The shape of the graph is important. For instance, I ran this algorithm on a (disconnected) network with about 200 million edges, 400 million vertices and sub-graphs with a maximum diameter of 6. It ran in about 20 minutes on a cluster with 19 beefy boxes.

However, I ran it on a much smaller (connected) network of 26 thousand vertices, 175 thousand edges and a diameter of 10 with little success. I found that I could manage only 3 iterations before Spark executors started to die with (apparently) memory problems.

The problem was that this graph had regions that were highly interconnected (it actually represented all the business entities we had that were related to BlackRock Investment Management and Merrill Lynch, of which there are many). Let's say that a particular vertex has 100 immediate neighbours each with 100 of their own and each of them had 100. This quickly explodes into many possible paths through the original vertex (about 1 million) after only 3 super-steps.

It's not too surprising that this is an issue for us. After all, Spark's ScalaDocs do say "ideally the size of [the merged messages] should not increase."

For such a dense graph, super nodes are almost inevitable. For our purposes, we could ignore them but YMMV depending on your business requirements.

No comments:

Post a Comment