Tuesday, November 29, 2016

Spark matrix multiplication example

Here's an example of cosine similarities in Spark using the previously described algorithm.

Imagine we have a matrix where the rows are vectors:


In Spark, this is represented by an RDD of SparseVectors. Each SparseVector represents one row in the matrix.

Recall that for each vector, we attach the corresponding row and column index to each non-zero element. We then reduceByKey using the column index as our key.

We do this to all columns but as an example, let's take the second column (k=2). 


Giving us the tuples { (1, 0.5), (2, 0.5), (3, 0.8), (4, 0) } corresponding to k=2. Note that the first element of each tuple is the row index and the second the value in the highlighted cell.

Since the this tuple is keyed on k, all these values will end up in the same Spark partition.

Partition A(k=2) -> { (1, 0.5), (2, 0.5), (3, 0.8)  }
Partition B(k=3) -> ...
Partition C...

Note that we're not interested in values that are zero so we can discard the last tuple in the original set (zeroes simply disappear). For all the others, we want a Cartesian product of this set with itself and then we want to multiply out the values for each pair. Furthermore, we're not interested in the product of a tuple with itself and we only care about co-ordinates (i,j) when i< j because we don't want to do the same work twice.

So the result will look like this:

{ ((1,2), 0.25), ((1,3), 0.4), ((2,3), 0.4) }

At this point, we're no longer interested in k and can discard it. Each element of the above set moves to a partition according to the first element of the tuple. So, we'd expect something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4)
Partition C((2,3), 0.4)

Note that all the interesting information in the example so far was (arbitrarily) on Partition A. Now, it's been shuffled to Partitions B and C. This is just an example. The shuffled data could have ended up in any partition.

If we do the same again for, say, the last column, we'd expect the results of multiplying rows 1 and 3. The other rows have zero in that column so we're not interested in them.

Now, our Spark partitions would look something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4), ((1,3), 0.1)
Partition C((2,3), 0.4)

When we're done with all the columns, our partitions will look something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4), ((1,3), 0.1), ((1,4) 0.5)
Partition C((2,3), 0.4), ((2,3), 0.1)

Finally, if we reduce on each partition by the co-ordinates key, we'll get:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.5), ((1,4) 0.5)
Partition C((2,3), 0.5)

which if we imagine is a matrix looks like this:


But, wait, that's not what our matrix multiplied with its transpose should look like. If we take the matrix at the top of this page and multiply it by its transpose we get:


The reason, of course, is that we're not interested in the diagonal as that's the similarity of a vector with itself (in real life, the diagonals would each have value 1.0 but I wanted to keep the numbers simple). What's more, we're only interested in half the matrix (the upper half) as the lower half is the same and we don't want to do the same work twice (in fact, any matrix multiplied by its transpose will be symmetric like this).

Consequently, only the upper half (the highlighted cells) are the same as the matrix we calculated. The rest of it is of no use to us.

An Alternative Approach to Matrices in Spark

I've been writing some code that performs TF-IDF on a corpus of text made up of M number of records. Given the frequency of each term, I can build a normalized vector for each record made up of the terms in it. To reduce complexity, I ignore terms that appear more than W times.

Cosine Similarities and Matrix Multiplication

(Note: since Spark 1.6.0, there is a method in IndexedRowMatrix.columnSimilarities to compute cosine similarities. However, this only delegates to an old 1.2.0 method that uses DIMSUM. This blows up on anything remotely large with OOMEs. See here for more information).

If we have a set of vectors that we want to multiply together to find their cosine similarities, we can imagine them as a matrix where each row in that matrix is one of our vectors.

So, imagine we have M vectors { v1, v2, v3, ... vM }. Each vector is made up of N elements so, for instance, vector vcan be expressed as { v11, v12, v13 ... v1N }.

We can express this as a matrix that looks like:


(You'll notice that each row represents a record and each column represents a term. If a term appears in that record then vij is non-zero).

What we want is a matrix that holds every vector multiplied by every other vector. For instance, for v1 and v2 we want

v11v21+ v12v22+ v13v23 +... v1Nv2N

Well, this is the same as multiplying the matrix by its own transpose:
and looking at element (1, 2) of that matrix for this particular pair in this particular example.

So, using the brevity of summation notation, if this matrix is called A then:

Aij  = Σvinvjn


Or, using Einstein notation, we can state it even more succinctly:

Aij = vinvjn

since the n index is repeated.

Given two matrices, one m x n the other n x p, the complexity of multiplying them is O(mnp). In our case, we're multiplying a matrix by its transpose so p = m. Therefore, the complexity is O(m2n) - pretty bad.

The good news is that our matrix is sparse. How sparse depends on our variable W that determines at what point we start discarding terms in our TF-IDF stage.

Spark Implementation

Since we are just multiplying a matrix by its transpose, we might want to use BlockMatrix.multiply. However, the documentation tells us:
"The output BlockMatrix will only consist of blocks of DenseMatrix. This may cause some performance issues until support for multiplying two sparse matrices is added."
Since M and N for us is typically hundreds of millions, this is a problem.

So, we might want to just manually multiply any two vectors, i and j, in our equation for Aij. The trouble is that if we have two vectors in a Spark RDD, vi and vj, they may very well live in different partitions. And since we want to calculate vi . vj for all i,j then we'll definitely be hitting all partition combinations and there will be O(M2) multiplications.

We want to reduce the amount of inter-partition shuffling and avoid O(M2) multiplications of N-sized vectors. One solution might be to take each vector and for each non-zero element attach it's column ID which we'll call k. We can then reduceByKey (where the k is the key) and start multiplying out the set of values for each key.

That is, associated with every column, k, there will be a set of vectors that have non-zero elements for their k-th element. There will be a maximum of W of them since that's the limit we imposed on how often a term appears before we consider it insignificant. We'll call this set K where in set-builder notation:

K = { vxk ∈ A | vxk ≠ 0 and |K| ≤ W}

We then take a Cartesian product of this set and attach the tuple (i,j), that is, its co-ordinates such that we have a set, D, where:

Dk = { ((i,j), vik vjk) | vikvjk ∈ K and i < j}

We ignore the case where i=j since we're not interested in the cosine similarities of a vector with itself. We know it's 1 as they're unit vectors.

Note, that this can be computationally expensive as it scales as O(W2).

Now we can reduceByKey on the co-ordinates (i,j) and we'll have obtained the cosine similarity for all i, j.

Validity of this approach

Mathematically, if our column index is k and we call the contribution of the term corresponding to k to any cosine similarity between any pair of vectors (i,j), Di,jk then:

Di,jk = vikvjk

which is just our equation above for Aij (with k=n) thus proving that this technique is mathematically correct at least. How good its performance is something else...

By making the key the first element of a tuple, all values will live in the same partition. Spark will partition Tuple2s according to the first element in the tuple.


We can demonstrate this so:

    type Pair = (Long, Long)

    val pairs = (1 to 100).map(x => (x.toLong, Random.nextLong()))
    val rdd   = sparkContext.parallelize(pairs)

which gives us an RDD of random pairs of Longs, the first being between 1 and 100. Now we create a function that can map over the RDD assigning each element a number unique to the partition it's in:

  type Enumerated[T] = (Int, T)

  val idGenerator = new AtomicInteger(0)
  def partitionMapper[T]: Iterator[T] => Iterator[Enumerated[T]] = { pairIterator =>
    val partitionId = idGenerator.getAndIncrement()
    val enumerated  = ArrayBuffer[Enumerated[T]]()
    pairIterator.foreach { pair =>
      enumerated += ((partitionId, pair))

Finally, we see the output:

    val partitioned  = rdd mapPartitions partitionMapper[Pair]
    val inMem        = partitioned.collect()
    val groupedPairs = inMem.groupBy(_._1).toSeq.sortBy(_._1)
    groupedPairs foreach { case (partitionId, enumerated) =>
      val vals = enumerated.map(_._2._1)
      println(s"partition #$partitionId: ${vals.min}  to ${vals.max} ")

Which produces:

partition #0: 51  to 75
partition #1: 1  to 25
partition #2: 26  to 50
partition #3: 76  to 100

Very neat.


It looks like I am not the only person who is rolling-his-own distributed matrix multiplication solution using Spark (see this link here). It seems others have found you don't get it out-of-the-box. There appear to be moves afoot to address this.

Sunday, November 27, 2016


Spark tries to abstract you from the data so you can write beautiful, functional code. The truth is that you have to get your hands dirty if you don't want it to run like a dog.

I'm processing a graph of over 100 million edges and about 20 million vertices. It's been a learning experience to make it perform. It first took over 24 hours to execute the Connected Components algorithm. I finally managed to run it in less than an hour. But it wasn't easy.

Cluster configuration

I didn't find GraphX very memory hungry, at least not for the executors (the entire graph was only some 4gb on HDFS). After 24 hours on my first attempt, the job looked far from over. Using jstat, I noticed that the driver was GCing a lot. It had 8gb but I increased it to 20 and it was much more happy.

Although one is often recommended to run a larger number of executors with fewer cores each, for GraphX this might not be the best configuration. Facebook found:

"Even though both systems can run multiple workers on a single physical machine, we found that it is more efficient when we start only one worker process per machine and allocate all the available memory to it. In the following experiments, the number of workers is also the number of physical machines used for each job. We allocated 80 GB of RAM to each worker."

The shape of the graph

What your graph looks like makes a difference to the performance of GraphX. Looking at the maximum number of "Shuffle write Size / Records" in each super step (mapPartitions at GraphImpl.scala:207 in Spark 2.0.2) in the Spark web GUI, we see the numbers steadily decreasing.

This depends on the shape of the graph.

"GraphFrames and GraphX both use an algorithm which runs in d iterations, where d is the largest diameter of any connected component (i.e., the max number of hops between any 2 nodes in a component). So the running time will vary significantly depending on the your graph's structure. Tuning the vertices and edges DataFrames by making sure to cache them and possibly adjust the partitioning beforehand can help." (from DataBricks forum)

As somebody pointed out on StackOverflow:

"If you have a cluster where each of the vertexes is connected to every other vertex, then after one round of messages each one knows who the lowest VertexID is, and they all go silent the next round. One sequential step, the entire cluster.

"If, on the other hand, each vertex in the cluster is only connected to at most 2 other vertices, then it could take N sequential steps before all the vertices know who what the minimum VertexID is."

I tested this by making a graph that was just a chain of numbers 1, 2, 3, 4 ... 20. I found that after 10 super-steps, there were 10 resolved connected components: {1 to 11}, {12}, {13}, {14} ... {20}.


How do you partition a graph? Facebook said that they got good results using EdgePartition2D. The trick here is to imagine the graph as matrix where for a given vertex X, all non-zero elements in row X indicate an outgoing edge and all non-zero elements in row X indicate an incoming edge.

Therefore, all the details for a given vertex will either be in single row and a single column. So, for an NxN matrix, all the data will be in 2N elements (ie one row plus one column). Equivalently, if there are E edges, the upper bound on the number of partitions where we store the data for X is O(√E) assuming a maximum of one edge per pair of vertices (which is not a problem for the connected components algorithm).

But the biggest saving came from reducing the number of partitions. The previous stage of the pipeline had deliberately and explicitly increased the number of partitions to avoid running out of memory when performing matrix multiplication. This can be a disaster for GraphX.

The previous job had increased the number of partitions to some 18k. What's going on is described here:
As the graph algorithm progresses, it is common for less and less [sic] of the vertices to remain active. Therefore a full scan of all triplets during a map-reduce becomes less and less effective. “For example, in the last iteration of connected components on the Twitter graph, only a few of the vertices are still active. However, to execute mrTriplets we still must sequentially scan 1.5 billion edges and check whether their vertices are in the active set.”
Although GraphX has clever optimizations here, there's was still a huge amount of overhead in constantly mapping and reducing.

Coalescing the 18 000 partitions to a mere 360 on a cluster with 8 boxes and 24 cores per box, the time to run was reduced to a healthy 10-40 minutes depending on sub-graph structures.

Saturday, November 19, 2016

Probabilistic Programming - Part 2

In part 1, I looked at the first chapter of Bayesian Methods for Hackers. In this post, I'll conduct the same analysis on the same data but instead of using Python I'll be using Scala and Figaro.

To recap: we have somebody's text usage and we want to know when (if at all) there was a change in usage. All we have is the number of texts sent each day over a 74 day period.

We suppose that the distribution of texts per day is a Poisson distribution as we assume the text messages are independent of each other, it's a discrete distribution and it's got the nice property that the one constant it depends on (λ) is also its mean.

Unfortunately, we don't know what this parameter is so we need to model that stochastically too. This λ parameter is not discrete but continuous, so we choose the exponential distribution to model as it too has nice properties. It too depends on a constant parameter (α) and it has the nice property that its mean is α-1.

Because of these nice properties the distributions have, we are helped in our initial guess at what their values are. In this case, we can take the mean number of texts (let's call it m) and then we know λ will (roughly) be that value. However, λ is not a fixed value, it's a stochastic variable modeled by an exponential function. That's OK though since we know that the mean of an exponential distribution is α-1 then α=m-1.

Finally, we are assuming that on some day indicated by a value tau, we switch from one Poisson distribution to another so we need two of everything. Regarding the s, "because they’re parameters of a parameter, they’re sometimes called hyper-parameters." [1]

OK, enough of the maths, here's the code:

    val data: List[Int] = // load the data from a file

    val mean  = data.sum.toDouble / data.length.toDouble
    val alpha = 1.0 / mean

    val lambda1: Element[Double] = Exponential(alpha)
    val lambda2: Element[Double] = Exponential(alpha)

    val poisson1  = Poisson(lambda1)
    val poisson2  = Poisson(lambda2)

We don't know the initial value of tau so we guess it's initially likely to be any value in the 74 day period. We express that in Figaro thus:

    val tau: AtomicUniform[Int] = Uniform(0 to n_count_data: _*)

And then we build our model to describe the situation outlined in the first few paragraphs thus:

    def pDay(day: Int): Element[Int] = If (tau.map(value => value > day), poisson1, poisson2)

    val modelData: Seq[Element[Int]] = (1 to n_count_data).map(d => pDay(d))

Finally, we tell the model what actual data is to help it know what it's shooting for:

    modelData.zip(data).foreach { case(test, real) =>
      test.observe(real) // WARNING! This does not play nicely with MH (see below)

Then we hand it off to an algorithm that solves the equation for us, Metropolis-Hastings.

    val mh = MetropolisHastings(numSamples = 40000,
      ProposalScheme(tau, lambda1, lambda2),
      burnIn = 10000,

Oops. It gets stuck. Even if I reduce the number of samples from 40 000 to 10. Hmm. Well, it prompted me to ask Avi Pfeffer, the creator of Figaro, what the problem was.

"You have 70+ variables representing different days with 70+ possible values for each variable. When you observe all of these, you get an extremely unlikely observation. I think MH is struggling with finding any state that causes the observations to be satisfied."

That is, the chances of getting the actual data given the model we have built is staggeringly small. Annoyingly, I already knew this as I had read Dr Pfeffer's book where he built a probabilistic model of movie actors winning awards. Naturally, only one person can win the Oscar for best actor, so the model gets stuck as MH cannot explore any other actor winning the Oscar as it's a two step process to get there: before we select a new actor the original actor is no longer the winner (zero actors win the award - that's impossible) or we choose another actor who wins the award but before we say the original is not the winner (two actors win the award - also impossible).

There are two solutions. "The first is to use proposal schemes that are different from the default scheme. These custom schemes can, for example, propose to change two elements at once. The second idea is to avoid hard conditions that can never be violated." [1]

I chose the latter and wrote this:

  def fitnessFnPowered(real: Int, power: Double): (Int) => Double = x => 1.0 / pow((abs(real - x) + 1), power)

That is, we calculate a value in the range [0,1] where 1 indicates a direct hit and anything else is less than this depending on how much off target it was. How quickly wrong proposals deviate from 1.0 is determined by the value of power.

    modelData.zip(data).foreach { case(test, real) =>
      test.addConstraint(fitnessFnPowered(real, ???))

And now my algorithm runs to completion. Well, it does if I give it a power rather than ???. What is a good value? By trial and error, I got:

And the probability became 1.0 (ie, certain) when the power value was set to 5.0. Although the distributions differ to the BM4H book, both frameworks agree on the most probable value for the day the usage change: day 45.

[1] Practical Probabilistic Programming, Avi Pfeffer.

Friday, November 11, 2016

Conjugate Priors in Bayesian Probability

Apropos the recent election, I was reading Allen Downey's blog about Nate Silver and Bayesian forecasting. This piqued my interest in probability distributions (that I forgot about a quarter century ago). Here are some notes.

Binomial Distribution

This is a distribution that describes a sequence of n independent yes/no experiments each with a probability of p. It is a discrete since you obviously can't have, say, 3.25 successes.

The formula for the binomial probability mass function (PMF) is:

nCk pk (1 - p)n-k


n is the number of trials
k is the number of successes
p is the probability of success.

Beta Distribution

"The beta distribution is defined on the interval from 0 to 1 (including both), so it is a natural choice for describing proportions and probabilities.". It is (obviously) a continuous function.

The probability density function (PDF) for the beta distribution is:

xα - 1(1 - x)β - 1

    B(α, β)

where α and β are hyperparameters and B(α, β) is:

Γ(α) Γ(β)

Γ(α + β)

where Γ(n) is the gamma function and it's simply defined as:

Γ(n) = (n - 1)!

OK, that sounds a lot more complicated than it is but any decent coder could write all this up in a few lines of their favourite language.

One last thing, the sum of the hyperparameters is the total number of trials. That is

α + β = n

Bayes: Binomial and Beta distribution

And this is where it gets interesting. Recall that the posterior distribution is proportional to the likelihood function and the prior. Or, in maths speak:

P(A | X) ∝ P(X | A) P (A)

"It turns out that if you do a Bayesian update with a binomial likelihood function ... the beta distribution is a conjugate prior. That means that if the prior distribution for x is a beta distribution, the posterior is also a beta distribution." [1]

How do we know this? Well take the equation for the binomial's PMF and multiply it by the beta's PDF. But, to make things easier, note that nCk and B(α, β) are constants and can be ignored in a "proportional to" relationship. So, it becomes:

P(a | x) ∝ xk (1 - x)n-k xα - 1(1 - x)β - 1

which simplifies to:

P(a | x) ∝ xα+k-1 (1 - x)n-k+β - 1

Which is another beta distribution! Only this time

α' = α + k - 1


β' = n - k + β - 1

Proofs for other combinations can be found here, more information here, a better explanation by somebody smarter than myself is here, and possibly the most informative answer on StackExchange ever here.

[1] Think Bayes, Downey

Recursive Data Types

I was pointed in the direction of this lecture. These are notes I made.

Imagine you want to build a hierarchy of professors and their students who too go on to be professors. You might like to model it like this:

case class ProfF[A](
  name: String,
  year: Int,
  students: List[A]

And have case classes to allow us to make recursive types because "type aliases can't be recursive but classes can" (using recursive type aliases result in "types [that] are infinite").

What's more, with ProfF "depending on how we wrap it we can represent just the pure data or the data annotated with database id" thus:

case class Prof(value: ProfF[Prof])


case class IdProf(id: Int, value: ProfF[IdProf])

But this is too tightly coupled with ProfF so let's refactor them like so:

case class Prof[F[_]](value: F[Prof])
case class IdProf[F[_]](id: Int, value: F[IdProf])

and too tightly coupled to the type of the id, so let's refactor that too:

case class Prof[F[_]](value: F[Prof[F]])
case class IdProf[F[_], A](id: A, value: F[IdProf[F, A]])

But there is nothing new under the sun as actually these useful data structures already exist in Scalaz.

case class Fix[F[_]](unfix: F[Fix[F]])
case class Cofree[F[_], A](head: A, tail: F[Cofree[F, A]])

and let's also throw in:

case class Free[F[_], A](resume: A \/ F[Free[F, A]])

Which is a type that says resume is either A or F[Free[F, A]]. It's like an Either is Scala. "\/[A, B] is isomorphic to scala.Either[A, B], but \/ is right-biased, so methods such as map and flatMap apply only in the context of the right case" (from here).

"In the same way you think of Cofree as a structure with labels at each node, Free is a structure with labels at the leaves". That is, where Cofree will always have an AFree can either have an A but no further branches (ie, it's a leaf) or it has branches and no A (ie,  it's a branch to another Free).


If is a functor, Free is the free monad. 

Also, if is a functor, Cofree is a comonad. More on this later.

"If we have a Fix we can always get an F out by calling unfix, and we can do that with Cofree by calling tail. The general name for this operation is project and data types that do this are called recursive types. And note that we can't do it with Free because you might not have an F; you might have an A

"But you can always go the other way. If you have the F you can construct a Free. Same with Fix. But you can't do this with Cofree because you also need an A. So this operation (the dual) is called embed and data types with this behavior are corecursive types."

This is used in the wonderfully named Matryoshka library.

What is it good for?

Well, we can build a recursive data structure to be sent to a persistence layer with this:

type Prof = Fix[ProfF]

and receive a similar (technically corecursive) data structure back but that now contains DB-assigned primary keys with this:

type IdProf = Cofree[ProfF, Int]

These are the basics you need to understand Doobie, a functional library for accessing JDBC.

Wednesday, November 2, 2016

Spark and Vector Multiplication

More Spark shenanigans. I'm having all sorts of problems while attempting to make the run stable. I'm seeing FetchFailedExceptions that (according to this) are "almost guaranteed to be caused by memory issues on your executors" and FileNotFoundExceptions that mysteriously appear when an executor throws an OutOfMemoryError in the initializer of org.apache.spark.shuffle.sort.ShuffleInMemorySorter.

Thanks for the memory

So, the first thing is to review how Spark manages its memory. The areas are broken down as:
  • Reserved Memory
  • User Memory
  • Spark Memory -broken down into:
    • Storage Memory
    • Executor Memory
where (from here)

Reserved Memory "is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB"

User Memory "is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations".

Storage Memory "is used for both storing Apache Spark cached data and for temporary space serialized". It also stores broadcast variables.

Execution Memory "is used for storing the objects required during the execution of Spark tasks."

Why heap space isn't the whole story

Here is an example.

Given "a matrix A that has a large number of columns (a short and fat matrix) and a matrix B that has a large number of rows (a tall and thin matrix)" that we want to multiply, then there's a good chance multiplying them may blow up.

The reason is that this can be very memory intensive (see the User Memory section above).

To get around this, we can "apply the outer product to each row of the RDD, a column of A with a row of B. This will create an RDD of small matrices size m*k." We then "use the reduce method to sum the all the small matrices."

Neat. So the trick is to break down each step into smaller steps. In this example, it's done by making an RDD that pairs small vectors together so they can be multiplied with very little overhead.

Refactor the problem

Realizing that my vector multiplication was probably the culprit, I gave myself more user memory but setting spark.memory.fraction much lower (about 0.01) since

User Memory ~ (1.0 - spark.memory.fraction)

This improved things insofar as my run ran for longer but it still ultimately blew up.

The problem was that my sparse matrix was not sparse enough. It could have rows with 2000 non-zero elements. Since the row could have about 100 million elements overall, this is still probably considered sparse. But each element is multiplied with each other element while I calculated Cosine Similarities. This scales as O(N2).

OK, ~20002 results is not the end of the world but what if we get a few of these in the same JVM? We quickly exhaust the memory. Increasing User Memory reduces the likelihood but does not eliminate it. The solution (like the matrix multiplication example above) is to break down the problem into a new RDD that presents the data in smaller chunks.