Thursday, December 22, 2016

Spark and the GC


Spark is memory hungry and that means you need to consider the garbage collector when tuning.

There are a number of GCs to chose from that may or may not be appropriate for your job. Some say that G1 may give you (soft) guarantees for maximum latency time but are not good for batch jobs. Others point out that CMS may give lower pauses than ParallelGC (the default on 64-bit machines) but that ParallelGC has better overall throughput and since we're running a batch job not a web server, ParallelGC may suit Spark.

More logs than a lumberjack

Whichever GC you use, you'll need to turn logging on and analyse the logs.

You can see objects being promoted from young to old generation (see here and here) but it's inferred rather than stated explicitly. At a young GC, you'll see something like:

[PSYoungGen: BEFORE_YOUNG->AFTER_YOUNG(CAPACITY_YOUNG)] BEFORE_HEAP->AFTER_HEAP(CAPACITY_HEAP)

Which describes the memory usage in the young generation before GC (BEFORE_YOUNG), after (AFTER_YOUNG) and the capacity of the young generation.

It also shows the usage of the heap as a whole before (BEFORE_HEAP), after (AFTER_HEAP) and the heap's total capacity.

Note that it is not necessarily the case that:

Δ = (BEFORE_YOUNG-AFTER_YOUNG) - (BEFORE_HEAP-AFTER_HEAP) != 0

This is because not all of the young generation was GCed. Some was promoted to the old generation.

In an attempt to avoid premature promotion, and with JVMs that had 8gb of memory, I set -XX:MaxNewSize=6G -XX:SurvivorRatio=6 but instead of taking 5 hours, it took 6. I tried this because there was a lot of new generation GC and not a lot of old.

Trying G1GC took just over 6 hours. And setting -XX:MaxNewSize=1G -XX:SurvivorRatio=3 just had to be killed as it looked like it was going to take days. So, I had to look elsewhere.

Disks

You can also check your disk access times by using a script taken from here, namely:

dd if=/dev/zero of=/tmp/output.img bs=8k count=256k
rm /tmp/output.img

A decent HDD will typically give you about 300MB/s, a bog-standard SSD (my NUC at home) about 500MB/s.

The best way to avoid Garbage Collection...

... is not to create any garbage. This might not always be possible but you can minimize GC. Kryo will compress objects amazingly well by replacing the verbose String that represents their FQNs with just a byte or two. However, it will only compress components of the class you register with it, not a deep tree of components. For instance, if you register org.apache.spark.mllib.linalg.SparseVector, you'd do well to also register Array[Int] and Array[Double] as these are what hog memory.

It is not sufficient to just register Array[_].

You can put something like sparConf.set("spark.kryo.registrationRequired", "true")in your code to see what needs to be explicitly added. This will cause runtime exceptions to be thrown until everything to be (un)compressed has been explicitly registered.

After these changes, the shuffle went from some 2.3TB to 300GB and consequently the time for the job dropped from about 5 hours to less than 45 minutes. With less memory to churn there was less GC and this makes a huge difference to how long your Spark job takes.

Friday, December 16, 2016

More Spark tuning


I've spent some time trying to tune matrix multiplication in Spark. I had 32 executors each with 4 threads and sampled the stacks with jstack over a few minutes with a handy little script:

> cat ~/whatsEveryoneDoing.sh
for BOX in `cat ~/boxes.txt` ; do { echo $BOX ; ssh $BOX "for PID in \`ps -ef | grep $1 | grep jdk8 | grep -v bash | awk '{print \$2}'\` ; do { sudo -u yarn jstack \$PID | grep -A45 ^\\\"Exec ; } done ; echo " 2>/dev/null ; } done

where ~/boxes.txt is just a file with the addresses of all the boxes in the cluster and the argument with which you invoke it is the application ID of the job.

I noticed that of my 128 executor threads, in each sample 50 or so were contending for the lock at:

org.apache.spark.storage.memory.MemoryStore.reserveUnrollMemoryForThisTask(Memstore.scala:570)

(This is Spark 2.0.2).

So, reducing --exector-cores from 4 to 2 while only increasing the number of executors from 32 to 48 had a big impact. The runtime was reduced from about 7.5 hours to 5 hours.

Wednesday, December 14, 2016

Bayesian Fun


The Model

Dr Bristol has tea with statistician Sir Ronald Fisher. She claims she can tell the difference between when the tea is poured first then the milk or milk first then the tea. Sir Ronald doesn't quite believe this and tests her. Sure enough, she gets it right five out of six times. Is this statistically significant?

Sir Ronald "arrived at a method that consists of calculating the total probability of the observed result plus the probability of any more extreme results possible under the null hypothesis (i.e., the probability that she would correctly identify 5 or 6 cups by sheer guessing). This probability is the p-value. If it is less than .05, then Fisher would declare the result significant and reject the null hypothesis of guessing." (from here).

The p-value debate has been rumbling for a while ("for example, a 0.05 p-value does not imply the false positive rate is 5.0%, it can be much higher."). Briefly, if there are 100 000 hypothesis and only 10% are true, of the 90 000 that are not true, 4500 will randomly be within the 5% (p-value = 0.05) range. So, 4500 of the 14 500 hypothesis that we think are true are false - and this is if we correctly identify all of the 10%! That means 31% of our knowledge is rubbish and that's the best case scenario!

Devout Bayesian, Prof Dennis Lindley, in this article discusses why they are inadequate and proposes a Bayesian solution.

The Problem

Lindley notes that when using p-values, the probabilities change depending on your experiment and not the results. To illustrate:

"If the experiment consisted of 6 pairs of cups being tested and the result was RRRRRW, the relevant probability is .109", that is 6C5 ½6C6 ½6 and here, R means 'right' and W means 'wrong'.

"If the experiment consisted of pairs being tested until the first error, with the same result, the relevant probability is .031", that is ½5.

This "leads to absurdities because it hinges upon the nonexistent ability to define what other unobserved results would count as “more extreme” than the actual observations. That is, if Fisher had set out to serve Dr. Bristol 6 cups (and only 6 cups) and she is correct 5 times, then we get a p-value of .1, which is not statistically significant. According to Fisher, in this case we should not reject the null hypothesis that Dr. Bristol is guessing. But had he set out to keep giving her additional cups until she was correct 5 times, which incidentally required 6 cups, we get a p-value of .03, which is statistically significant. According to Fisher, we should now reject the null hypothesis. Even though the data observed in both cases are exactly the same, we reach different conclusions because our definition of “more extreme” results (that did not occur) changes depending on which sampling plan we use." (Etz et al)

Bayesian Analysis

Bayes defined the posterior probability as:

posterior = K x prior x likelihood

"where K is a number chosen to make the integral of the right-hand side 1." (Lindley)

"Likelihood is a funny concept. It’s not a probability, but it is proportional to a probability ... Since a likelihood isn’t actually a probability it doesn’t obey various rules of probability. For example, likelihood need not sum to 1.

"A critical difference between probability and likelihood is in the interpretation of what is fixed and what can vary. In the case of a conditional probability, P(D|H), the hypothesis is fixed and the data are free to vary. Likelihood, however, is the opposite. The likelihood of a hypothesis, L(H|D), conditions on the data as if they are fixed while allowing the hypotheses to vary.

"Bayes factors are simple extensions of likelihood ratios. A Bayes factor is a weighted average likelihood ratio based on the prior distribution specified for the hypotheses." [from Etz's blog]

The Priors

Lindley considers the case of wine and tea tasting and people who claim they can tell you about the process with which they were made. The author believes that wine experts really know what they're tasting while tea experts do not.

Lindley encodes these beliefs in the following equations:

Belief that the wine taster can correctly identify wines with probability P:

48(1-P)(P-½) for ½ < P < 1

"This expresses the fact that I think that she can discriminate but can make mistakes. The value 48 makes the total probability 1," he says.

Belief that the tea taster can correctly identify the preparation process with probability P:

l.6(l-P) for P ≥ ½

Note the l.6 is derived from integrating this function between  P ≥ ½ and equating it to 1 (ie, the area under the curve cannot exceed 1.0 as that's the highest a probability can go).

The Data

Given 6 trials, the probability that the taster correctly identifies all 6 is P6. The posterior in this case for tea is:

normalizer . prior . likelihood = K (1 - P) P6 for ½ < P < 1

The probability that the taster identifies the first 5 correctly but the last incorrectly is P5(1-P).

"The prior value of this probability was .8, which drops to .59 when 1 error is made in 6 pairs", that is, P at the maximum posterior at RRRRRW is about 0.63.

"...and to .23 with no errors", that is, P at the maximum posterior for RRRRRR is about 0.86. So, the prior for this is 1.6 x (1 - 0.86) = 0.23

[Incidentally, we know this because the posterior ~ P6(1-P). So, if we differentiate to find the maximum:

(P6(1-P)) = 0
dP

or

6P5-7P6=0

which means P=6/7. This is the probability of the taster having special powers. Plug this in to our equation representing out belief and this becomes l.6(l-6/7) which is the 0.23 Professor Lindley is talking about.]

Naturally, the graph for wine tasting is different as our priors are a quadratic, not a linear equation in P. Their peaks are somewhat closer to 1.0 than our tea tasters (as we'd expect given the prior expresses greater confidence). But it's also worth noting the shape of the graphs. There is greater certainty when we see 6 Rs than with RRRRRW as the curve is sharper.

Conclusions

"It is typically true that the posterior probability of the null hypothesis exceeds the significance level, though there is no logical connection between the two values." (Lindley)

"Lindley’s Bayesian approach depends only on the observed data, so the results are interpretable regardless of whether the sampling plan was rigid or flexible or even known at all. Another key point is that the Bayesian approach is inherently comparative: Hypotheses are tested against one another and never in isolation" (Etz et al).


Tuesday, November 29, 2016

Spark matrix multiplication example


Here's an example of cosine similarities in Spark using the previously described algorithm.

Imagine we have a matrix where the rows are vectors:

0
0.5
0.5
0.4
0.25
0.5
0
0
0.4
0.8
0
0.25
0
0
1.0
0

In Spark, this is represented by an RDD of SparseVectors. Each SparseVector represents one row in the matrix.

Recall that for each vector, we attach the corresponding row and column index to each non-zero element. We then reduceByKey using the column index as our key.

We do this to all columns but as an example, let's take the second column (k=2). 

0
0.5
0.5
0.4
0.25
0.5
0
0
0.4
0.8
0
0.25
0
0
1.0
0

Giving us the tuples { (1, 0.5), (2, 0.5), (3, 0.8), (4, 0) } corresponding to k=2. Note that the first element of each tuple is the row index and the second the value in the highlighted cell.

Since the this tuple is keyed on k, all these values will end up in the same Spark partition.

Partition A(k=2) -> { (1, 0.5), (2, 0.5), (3, 0.8)  }
Partition B(k=3) -> ...
Partition C...

Note that we're not interested in values that are zero so we can discard the last tuple in the original set (zeroes simply disappear). For all the others, we want a Cartesian product of this set with itself and then we want to multiply out the values for each pair. Furthermore, we're not interested in the product of a tuple with itself and we only care about co-ordinates (i,j) when i< j because we don't want to do the same work twice.

So the result will look like this:

{ ((1,2), 0.25), ((1,3), 0.4), ((2,3), 0.4) }

At this point, we're no longer interested in k and can discard it. Each element of the above set moves to a partition according to the first element of the tuple. So, we'd expect something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4)
Partition C((2,3), 0.4)

Note that all the interesting information in the example so far was (arbitrarily) on Partition A. Now, it's been shuffled to Partitions B and C. This is just an example. The shuffled data could have ended up in any partition.

If we do the same again for, say, the last column, we'd expect the results of multiplying rows 1 and 3. The other rows have zero in that column so we're not interested in them.

Now, our Spark partitions would look something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4), ((1,3), 0.1)
Partition C((2,3), 0.4)

When we're done with all the columns, our partitions will look something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4), ((1,3), 0.1), ((1,4) 0.5)
Partition C((2,3), 0.4), ((2,3), 0.1)

Finally, if we reduce on each partition by the co-ordinates key, we'll get:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.5), ((1,4) 0.5)
Partition C((2,3), 0.5)

which if we imagine is a matrix looks like this:

0
0.25
0.5
0.5
0
0
0.5
0
0
0
0
0
0
0
0
0

But, wait, that's not what our matrix multiplied with its transpose should look like. If we take the matrix at the top of this page and multiply it by its transpose we get:

0.66
0.25
0.5
0.5
0.25
0.3125
0.5
0
0.5
0.5
0.8625
0
0.5
0
0
1.0

The reason, of course, is that we're not interested in the diagonal as that's the similarity of a vector with itself (in real life, the diagonals would each have value 1.0 but I wanted to keep the numbers simple). What's more, we're only interested in half the matrix (the upper half) as the lower half is the same and we don't want to do the same work twice (in fact, any matrix multiplied by its transpose will be symmetric like this).

Consequently, only the upper half (the highlighted cells) are the same as the matrix we calculated. The rest of it is of no use to us.

An Alternative Approach to Matrices in Spark


I've been writing some code that performs TF-IDF on a corpus of text made up of M number of records. Given the frequency of each term, I can build a normalized vector for each record made up of the terms in it. To reduce complexity, I ignore terms that appear more than W times.

Cosine Similarities and Matrix Multiplication

(Note: since Spark 1.6.0, there is a method in IndexedRowMatrix.columnSimilarities to compute cosine similarities. However, this only delegates to an old 1.2.0 method that uses DIMSUM. This blows up on anything remotely large with OOMEs. See here for more information).

If we have a set of vectors that we want to multiply together to find their cosine similarities, we can imagine them as a matrix where each row in that matrix is one of our vectors.

So, imagine we have M vectors { v1, v2, v3, ... vM }. Each vector is made up of N elements so, for instance, vector vcan be expressed as { v11, v12, v13 ... v1N }.

We can express this as a matrix that looks like:

v11v12v13...v1N
v21v22v23...v2N
v31v32v33...v3N
.......
vM1vM2vM3...vMN

(You'll notice that each row represents a record and each column represents a term. If a term appears in that record then vij is non-zero).

What we want is a matrix that holds every vector multiplied by every other vector. For instance, for v1 and v2 we want

v11v21+ v12v22+ v13v23 +... v1Nv2N

Well, this is the same as multiplying the matrix by its own transpose:
v11v12v13...v1N
v21v22v23...v2N
v31v32v33...v3N
.......
vM1vM2vM3...vMN
v11v21v31...vM1
v12v22v32...vM2
v13v22v33...vM3
.......
v1Nv2Nv3N...vMN
and looking at element (1, 2) of that matrix for this particular pair in this particular example.

So, using the brevity of summation notation, if this matrix is called A then:


N
Aij  = Σvinvjn

n=1

Or, using Einstein notation, we can state it even more succinctly:

Aij = vinvjn

since the n index is repeated.

Given two matrices, one m x n the other n x p, the complexity of multiplying them is O(mnp). In our case, we're multiplying a matrix by its transpose so p = m. Therefore, the complexity is O(m2n) - pretty bad.

The good news is that our matrix is sparse. How sparse depends on our variable W that determines at what point we start discarding terms in our TF-IDF stage.

Spark Implementation

Since we are just multiplying a matrix by its transpose, we might want to use BlockMatrix.multiply. However, the documentation tells us:
"The output BlockMatrix will only consist of blocks of DenseMatrix. This may cause some performance issues until support for multiplying two sparse matrices is added."
Since M and N for us is typically hundreds of millions, this is a problem.

So, we might want to just manually multiply any two vectors, i and j, in our equation for Aij. The trouble is that if we have two vectors in a Spark RDD, vi and vj, they may very well live in different partitions. And since we want to calculate vi . vj for all i,j then we'll definitely be hitting all partition combinations and there will be O(M2) multiplications.

We want to reduce the amount of inter-partition shuffling and avoid O(M2) multiplications of N-sized vectors. One solution might be to take each vector and for each non-zero element attach it's column ID which we'll call k. We can then reduceByKey (where the k is the key) and start multiplying out the set of values for each key.

That is, associated with every column, k, there will be a set of vectors that have non-zero elements for their k-th element. There will be a maximum of W of them since that's the limit we imposed on how often a term appears before we consider it insignificant. We'll call this set K where in set-builder notation:

K = { vxk ∈ A | vxk ≠ 0 and |K| ≤ W}

We then take a Cartesian product of this set and attach the tuple (i,j), that is, its co-ordinates such that we have a set, D, where:

Dk = { ((i,j), vik vjk) | vikvjk ∈ K and i < j}

We ignore the case where i=j since we're not interested in the cosine similarities of a vector with itself. We know it's 1 as they're unit vectors.

Note, that this can be computationally expensive as it scales as O(W2).

Now we can reduceByKey on the co-ordinates (i,j) and we'll have obtained the cosine similarity for all i, j.

Validity of this approach

Mathematically, if our column index is k and we call the contribution of the term corresponding to k to any cosine similarity between any pair of vectors (i,j), Di,jk then:

Di,jk = vikvjk

which is just our equation above for Aij (with k=n) thus proving that this technique is mathematically correct at least. How good its performance is something else...

By making the key the first element of a tuple, all values will live in the same partition. Spark will partition Tuple2s according to the first element in the tuple.

Partitions

We can demonstrate this so:

    type Pair = (Long, Long)

    val pairs = (1 to 100).map(x => (x.toLong, Random.nextLong()))
    val rdd   = sparkContext.parallelize(pairs)

which gives us an RDD of random pairs of Longs, the first being between 1 and 100. Now we create a function that can map over the RDD assigning each element a number unique to the partition it's in:

  type Enumerated[T] = (Int, T)

  val idGenerator = new AtomicInteger(0)
  def partitionMapper[T]: Iterator[T] => Iterator[Enumerated[T]] = { pairIterator =>
    val partitionId = idGenerator.getAndIncrement()
    val enumerated  = ArrayBuffer[Enumerated[T]]()
    pairIterator.foreach { pair =>
      enumerated += ((partitionId, pair))
    }
    enumerated.toIterator
  }

Finally, we see the output:

    val partitioned  = rdd mapPartitions partitionMapper[Pair]
    val inMem        = partitioned.collect()
    val groupedPairs = inMem.groupBy(_._1).toSeq.sortBy(_._1)
    groupedPairs foreach { case (partitionId, enumerated) =>
      val vals = enumerated.map(_._2._1)
      println(s"partition #$partitionId: ${vals.min}  to ${vals.max} ")
    }

Which produces:

partition #0: 51  to 75
partition #1: 1  to 25
partition #2: 26  to 50
partition #3: 76  to 100

Very neat.

Conclusion

It looks like I am not the only person who is rolling-his-own distributed matrix multiplication solution using Spark (see this link here). It seems others have found you don't get it out-of-the-box. There appear to be moves afoot to address this.

Sunday, November 27, 2016

GraphX


Spark tries to abstract you from the data so you can write beautiful, functional code. The truth is that you have to get your hands dirty if you don't want it to run like a dog.

I'm processing a graph of over 100 million edges and about 20 million vertices. It's been a learning experience to make it perform. It first took over 24 hours to execute the Connected Components algorithm. I finally managed to run it in less than an hour. But it wasn't easy.

Cluster configuration

I didn't find GraphX very memory hungry, at least not for the executors (the entire graph was only some 4gb on HDFS). After 24 hours on my first attempt, the job looked far from over. Using jstat, I noticed that the driver was GCing a lot. It had 8gb but I increased it to 20 and it was much more happy.

Although one is often recommended to run a larger number of executors with fewer cores each, for GraphX this might not be the best configuration. Facebook found:

"Even though both systems can run multiple workers on a single physical machine, we found that it is more efficient when we start only one worker process per machine and allocate all the available memory to it. In the following experiments, the number of workers is also the number of physical machines used for each job. We allocated 80 GB of RAM to each worker."

The shape of the graph

What your graph looks like makes a difference to the performance of GraphX. Looking at the maximum number of "Shuffle write Size / Records" in each super step (mapPartitions at GraphImpl.scala:207 in Spark 2.0.2) in the Spark web GUI, we see the numbers steadily decreasing.

This depends on the shape of the graph.

"GraphFrames and GraphX both use an algorithm which runs in d iterations, where d is the largest diameter of any connected component (i.e., the max number of hops between any 2 nodes in a component). So the running time will vary significantly depending on the your graph's structure. Tuning the vertices and edges DataFrames by making sure to cache them and possibly adjust the partitioning beforehand can help." (from DataBricks forum)

As somebody pointed out on StackOverflow:

"If you have a cluster where each of the vertexes is connected to every other vertex, then after one round of messages each one knows who the lowest VertexID is, and they all go silent the next round. One sequential step, the entire cluster.

"If, on the other hand, each vertex in the cluster is only connected to at most 2 other vertices, then it could take N sequential steps before all the vertices know who what the minimum VertexID is."

I tested this by making a graph that was just a chain of numbers 1, 2, 3, 4 ... 20. I found that after 10 super-steps, there were 10 resolved connected components: {1 to 11}, {12}, {13}, {14} ... {20}.

Partitioning

How do you partition a graph? Facebook said that they got good results using EdgePartition2D. The trick here is to imagine the graph as matrix where for a given vertex X, all non-zero elements in row X indicate an outgoing edge and all non-zero elements in row X indicate an incoming edge.

Therefore, all the details for a given vertex will either be in single row and a single column. So, for an NxN matrix, all the data will be in 2N elements (ie one row plus one column). Equivalently, if there are E edges, the upper bound on the number of partitions where we store the data for X is O(√E) assuming a maximum of one edge per pair of vertices (which is not a problem for the connected components algorithm).

But the biggest saving came from reducing the number of partitions. The previous stage of the pipeline had deliberately and explicitly increased the number of partitions to avoid running out of memory when performing matrix multiplication. This can be a disaster for GraphX.

The previous job had increased the number of partitions to some 18k. What's going on is described here:
As the graph algorithm progresses, it is common for less and less [sic] of the vertices to remain active. Therefore a full scan of all triplets during a map-reduce becomes less and less effective. “For example, in the last iteration of connected components on the Twitter graph, only a few of the vertices are still active. However, to execute mrTriplets we still must sequentially scan 1.5 billion edges and check whether their vertices are in the active set.”
Although GraphX has clever optimizations here, there's was still a huge amount of overhead in constantly mapping and reducing.

Coalescing the 18 000 partitions to a mere 360 on a cluster with 8 boxes and 24 cores per box, the time to run was reduced to a healthy 10-40 minutes depending on sub-graph structures.


Saturday, November 19, 2016

Probabilistic Programming - Part 2


In part 1, I looked at the first chapter of Bayesian Methods for Hackers. In this post, I'll conduct the same analysis on the same data but instead of using Python I'll be using Scala and Figaro.

To recap: we have somebody's text usage and we want to know when (if at all) there was a change in usage. All we have is the number of texts sent each day over a 74 day period.

We suppose that the distribution of texts per day is a Poisson distribution as we assume the text messages are independent of each other, it's a discrete distribution and it's got the nice property that the one constant it depends on (λ) is also its mean.

Unfortunately, we don't know what this parameter is so we need to model that stochastically too. This λ parameter is not discrete but continuous, so we choose the exponential distribution to model as it too has nice properties. It too depends on a constant parameter (α) and it has the nice property that its mean is α-1.

Because of these nice properties the distributions have, we are helped in our initial guess at what their values are. In this case, we can take the mean number of texts (let's call it m) and then we know λ will (roughly) be that value. However, λ is not a fixed value, it's a stochastic variable modeled by an exponential function. That's OK though since we know that the mean of an exponential distribution is α-1 then α=m-1.

Finally, we are assuming that on some day indicated by a value tau, we switch from one Poisson distribution to another so we need two of everything. Regarding the s, "because they’re parameters of a parameter, they’re sometimes called hyper-parameters." [1]

OK, enough of the maths, here's the code:

    val data: List[Int] = // load the data from a file

    val mean  = data.sum.toDouble / data.length.toDouble
    val alpha = 1.0 / mean

    val lambda1: Element[Double] = Exponential(alpha)
    val lambda2: Element[Double] = Exponential(alpha)

    val poisson1  = Poisson(lambda1)
    val poisson2  = Poisson(lambda2)

We don't know the initial value of tau so we guess it's initially likely to be any value in the 74 day period. We express that in Figaro thus:

    val tau: AtomicUniform[Int] = Uniform(0 to n_count_data: _*)

And then we build our model to describe the situation outlined in the first few paragraphs thus:

    def pDay(day: Int): Element[Int] = If (tau.map(value => value > day), poisson1, poisson2)

    val modelData: Seq[Element[Int]] = (1 to n_count_data).map(d => pDay(d))

Finally, we tell the model what actual data is to help it know what it's shooting for:

    modelData.zip(data).foreach { case(test, real) =>
      test.observe(real) // WARNING! This does not play nicely with MH (see below)
    }

Then we hand it off to an algorithm that solves the equation for us, Metropolis-Hastings.

    val mh = MetropolisHastings(numSamples = 40000,
      ProposalScheme(tau, lambda1, lambda2),
      burnIn = 10000,
      tau)
    mh.start()

Oops. It gets stuck. Even if I reduce the number of samples from 40 000 to 10. Hmm. Well, it prompted me to ask Avi Pfeffer, the creator of Figaro, what the problem was.

"You have 70+ variables representing different days with 70+ possible values for each variable. When you observe all of these, you get an extremely unlikely observation. I think MH is struggling with finding any state that causes the observations to be satisfied."

That is, the chances of getting the actual data given the model we have built is staggeringly small. Annoyingly, I already knew this as I had read Dr Pfeffer's book where he built a probabilistic model of movie actors winning awards. Naturally, only one person can win the Oscar for best actor, so the model gets stuck as MH cannot explore any other actor winning the Oscar as it's a two step process to get there: before we select a new actor the original actor is no longer the winner (zero actors win the award - that's impossible) or we choose another actor who wins the award but before we say the original is not the winner (two actors win the award - also impossible).

There are two solutions. "The first is to use proposal schemes that are different from the default scheme. These custom schemes can, for example, propose to change two elements at once. The second idea is to avoid hard conditions that can never be violated." [1]

I chose the latter and wrote this:

  def fitnessFnPowered(real: Int, power: Double): (Int) => Double = x => 1.0 / pow((abs(real - x) + 1), power)

That is, we calculate a value in the range [0,1] where 1 indicates a direct hit and anything else is less than this depending on how much off target it was. How quickly wrong proposals deviate from 1.0 is determined by the value of power.

    modelData.zip(data).foreach { case(test, real) =>
      test.addConstraint(fitnessFnPowered(real, ???))
    }

And now my algorithm runs to completion. Well, it does if I give it a power rather than ???. What is a good value? By trial and error, I got:



And the probability became 1.0 (ie, certain) when the power value was set to 5.0. Although the distributions differ to the BM4H book, both frameworks agree on the most probable value for the day the usage change: day 45.

[1] Practical Probabilistic Programming, Avi Pfeffer.

Friday, November 11, 2016

Conjugate Priors in Bayesian Probability


Apropos the recent election, I was reading Allen Downey's blog about Nate Silver and Bayesian forecasting. This piqued my interest in probability distributions (that I forgot about a quarter century ago). Here are some notes.

Binomial Distribution

This is a distribution that describes a sequence of n independent yes/no experiments each with a probability of p. It is a discrete since you obviously can't have, say, 3.25 successes.

The formula for the binomial probability mass function (PMF) is:

nCk pk (1 - p)n-k

where

n is the number of trials
k is the number of successes
p is the probability of success.

Beta Distribution

"The beta distribution is defined on the interval from 0 to 1 (including both), so it is a natural choice for describing proportions and probabilities.". It is (obviously) a continuous function.

The probability density function (PDF) for the beta distribution is:


xα - 1(1 - x)β - 1
________________

    B(α, β)

where α and β are hyperparameters and B(α, β) is:

Γ(α) Γ(β)
_________

Γ(α + β)

where Γ(n) is the gamma function and it's simply defined as:

Γ(n) = (n - 1)!

OK, that sounds a lot more complicated than it is but any decent coder could write all this up in a few lines of their favourite language.

One last thing, the sum of the hyperparameters is the total number of trials. That is

α + β = n

Bayes: Binomial and Beta distribution

And this is where it gets interesting. Recall that the posterior distribution is proportional to the likelihood function and the prior. Or, in maths speak:

P(A | X) ∝ P(X | A) P (A)

"It turns out that if you do a Bayesian update with a binomial likelihood function ... the beta distribution is a conjugate prior. That means that if the prior distribution for x is a beta distribution, the posterior is also a beta distribution." [1]

How do we know this? Well take the equation for the binomial's PMF and multiply it by the beta's PDF. But, to make things easier, note that nCk and B(α, β) are constants and can be ignored in a "proportional to" relationship. So, it becomes:

P(a | x) ∝ xk (1 - x)n-k xα - 1(1 - x)β - 1

which simplifies to:

P(a | x) ∝ xα+k-1 (1 - x)n-k+β - 1

Which is another beta distribution! Only this time

α' = α + k - 1

and

β' = n - k + β - 1

Proofs for other combinations can be found here, more information here, a better explanation by somebody smarter than myself is here, and possibly the most informative answer on StackExchange ever here.

[1] Think Bayes, Downey

Recursive Data Types


I was pointed in the direction of this lecture (and these slides). These are notes I made.

Imagine you want to build a hierarchy of professors and their students who too go on to be professors. You might like to model it like this:

case class ProfF[A](
  name: String,
  year: Int,
  students: List[A]
)

And have case classes to allow us to make recursive types because "type aliases can't be recursive but classes can" (using recursive type aliases result in "types [that] are infinite").

What's more, with ProfF "depending on how we wrap it we can represent just the pure data or the data annotated with database id" thus:

case class Prof(value: ProfF[Prof])

or

case class IdProf(id: Int, value: ProfF[IdProf])

But this is too tightly coupled with ProfF so let's refactor them like so:

case class Prof[F[_]](value: F[Prof])
case class IdProf[F[_]](id: Int, value: F[IdProf])

and too tightly coupled to the type of the id, so let's refactor that too:

case class Prof[F[_]](value: F[Prof[F]])
case class IdProf[F[_], A](id: A, value: F[IdProf[F, A]])

But there is nothing new under the sun as actually these useful data structures already exist in Scalaz.

case class Fix[F[_]](unfix: F[Fix[F]])
case class Cofree[F[_], A](head: A, tail: F[Cofree[F, A]])

and let's also throw in:

case class Free[F[_], A](resume: A \/ F[Free[F, A]])

Which is a type that says resume is either A or F[Free[F, A]]. It's like an Either is Scala. "\/[A, B] is isomorphic to scala.Either[A, B], but \/ is right-biased, so methods such as map and flatMap apply only in the context of the right case" (from here).

"In the same way you think of Cofree as a structure with labels at each node, Free is a structure with labels at the leaves". That is, where Cofree will always have an AFree can either have an A but no further branches (ie, it's a leaf) or it has branches and no A (ie,  it's a branch to another Free).

Terminology

If is a functor, Free is the free monad. 

Also, if is a functor, Cofree is a comonad. More on this later.

"If we have a Fix we can always get an F out by calling unfix, and we can do that with Cofree by calling tail. The general name for this operation is project and data types that do this are called recursive types. And note that we can't do it with Free because you might not have an F; you might have an A

"But you can always go the other way. If you have the F you can construct a Free. Same with Fix. But you can't do this with Cofree because you also need an A. So this operation (the dual) is called embed and data types with this behavior are corecursive types."

This is used in the wonderfully named Matryoshka library.

What is it good for?

Well, we can build a recursive data structure to be sent to a persistence layer with this:

type Prof = Fix[ProfF]

and receive a similar (technically corecursive) data structure back but that now contains DB-assigned primary keys with this:

type IdProf = Cofree[ProfF, Int]

These are the basics you need to understand Doobie, a functional library for accessing JDBC.


Wednesday, November 2, 2016

Spark and Vector Multiplication


More Spark shenanigans. I'm having all sorts of problems while attempting to make the run stable. I'm seeing FetchFailedExceptions that (according to this) are "almost guaranteed to be caused by memory issues on your executors" and FileNotFoundExceptions that mysteriously appear when an executor throws an OutOfMemoryError in the initializer of org.apache.spark.shuffle.sort.ShuffleInMemorySorter.

Thanks for the memory

So, the first thing is to review how Spark manages its memory. The areas are broken down as:
  • Reserved Memory
  • User Memory
  • Spark Memory -broken down into:
    • Storage Memory
    • Executor Memory
where (from here)

Reserved Memory "is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB"

User Memory "is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations".

Storage Memory "is used for both storing Apache Spark cached data and for temporary space serialized". It also stores broadcast variables.

Execution Memory "is used for storing the objects required during the execution of Spark tasks."


Why heap space isn't the whole story

Here is an example.

Given "a matrix A that has a large number of columns (a short and fat matrix) and a matrix B that has a large number of rows (a tall and thin matrix)" that we want to multiply, then there's a good chance multiplying them may blow up.

The reason is that this can be very memory intensive (see the User Memory section above).

To get around this, we can "apply the outer product to each row of the RDD, a column of A with a row of B. This will create an RDD of small matrices size m*k." We then "use the reduce method to sum the all the small matrices."

Neat. So the trick is to break down each step into smaller steps. In this example, it's done by making an RDD that pairs small vectors together so they can be multiplied with very little overhead.


Refactor the problem

Realizing that my vector multiplication was probably the culprit, I gave myself more user memory but setting spark.memory.fraction much lower (about 0.01) since

User Memory ~ (1.0 - spark.memory.fraction)

This improved things insofar as my run ran for longer but it still ultimately blew up.

The problem was that my sparse matrix was not sparse enough. It could have rows with 2000 non-zero elements. Since the row could have about 100 million elements overall, this is still probably considered sparse. But each element is multiplied with each other element while I calculated Cosine Similarities. This scales as O(N2).

OK, ~20002 results is not the end of the world but what if we get a few of these in the same JVM? We quickly exhaust the memory. Increasing User Memory reduces the likelihood but does not eliminate it. The solution (like the matrix multiplication example above) is to break down the problem into a new RDD that presents the data in smaller chunks.

Saturday, October 22, 2016

State monads


State is generally an anathema to functional programmers but necessary to the rest of us. So, how are these forces reconciled? Perhaps not surprisingly it requires monads.

"We'll say that a stateful computation is a function that takes some state and returns a value along with some new state." (from LYAHFGG). So, if you had a random number generator (RNG), for each call you'd receive not just a random number but the generator with a new state for the next call. Calling the same generator repeatedly will give you the same value which makes testing easier.

Scalaz gives you a state out of the box. As an example, let's say we're modelling a stack data structure. The pop would look like this:

import scalaz._
import Scalaz._

    type Stack = List[Int]

    val pop = State[Stack, Int] { // (f: S => (S, A)) where S is Stack and A is Int
      case x::xs =>
        println("Popping...")
        (xs, x)
    }

Note that pop is a val not a def. Like the RNG mentioned earlier, it returns the popped value and the new state. We'll ignore error conditions like popping an empty stack for now.

Also note "the important thing to note is that unlike the general monads we've seen, State specifically wraps functions." (Learning Scalaz)

Let's now define what a push is:

    def push(a: Int) = State[Stack, Unit] {
      case xs =>
        println(s"Pushing $a")
        (a :: xs, ())
    }

Once more, the function returns the new state and a value - only this time the value isn't actually defined. What value can a push to a stack return that you didn't have in the first place?

Because State is a monad, "the powerful part is the fact that we can monadically chain each operations using for syntax without manually passing around the Stack values". That is, we access the contents of State via map and flatMap.

So, let's say we want to pop the value at the top of the stack then push two values onto it, 3 and 4.

    val popx1Push2: State[Stack, Int] = for {
      a <- pop
      _ <- push(3)
      _ <- push(4)
    } yield a 

which does absolutely nothing. What is to be popped? Onto what do we push 3 and 4? We need to run it and then map (or foreach or whatever) over the result to access the values. "Essentially, the reader monad lets us pretend the value is already there." (from Learning Scalaz)
   
    popx1Push2.run(List(0, 1, 2)).foreach { case (state, popped) =>
      println(s"New state = $state, popped value = $popped") // New state = List(4, 3, 1, 2), popped value = 0
    }

But we don't need Scalaz. We can roll our own state (with code liberally stolen from here).

case class State [S, +A](runS: S => (A, S)) {

  def map[B](f: A => B) =
    State [S, B]( s => {
      val (a, s1) = runS(s)
      (f(a), s1)
    })
    
  def flatMap[B](f: A => State [S, B]) =
    State [S, B]( s => {
      val (a, s1) = runS(s)
      f(a).runS(s1)
    })
}

Now it becomes clearer what State actually represents. It is a monad (see map and flatMap) that contains a function that given one state, S, can give you the next plus an accompanying value, A.

So, manipulating the state for our stack example looks like this:

  def getState[S]: State [S, S] =
    State (s => (s, s))
     
where the function says: given a state, return that state.

  def setState[S](s: S): State [S, Unit] =
    State (_ => ((), s))

where the function says: I don't care which state you give me, the next one is s. Oh, and I don't care about the accompanying value.

  def pureState[S, A](a: A): State[S, A] =
    State(s => (a , s))

where the function says the next state is the last state and the accompanying value is what I was given. Pure in this sense refers to the construction of a state, sometimes called unit, and called point in Scalaz.

Now, we do a stateful computation. It's very simple, it just maps over the state monads to generate a unit of work that adds 1 to whatever is given.

    val add1: State[Int, Unit] = for {
      n   <- getState
      b   <- setState(n + 1)
    } yield (b)

    println(add1.runS(7)) // "8"

This is trivial but you can do more complicated operations like:

  def zipWithIndex[A](as: List [A]): List [(Int , A)] =
    as.foldLeft (
      pureState[Int, List[(Int , A)]](List())
    )((acc, a) => for {
      xs  <- acc
      n   <- getState
      _   <- setState(n + 1)
    } yield (n , a) :: xs ).runS(0)._1.reverse

Yoiks! What's this doing? Well, reading it de-sugared and pretending we've given it an array of Chars may make it easier:

    type IndexedChars = List[(Int, Char)]

    val accumulated: State[Int, IndexedChars] = as.foldLeft(
      pureState(List[(Int, Char)]())      // "the next state is the last state and the accompanying value is an empty list"
    )((acc: State[Int, IndexedChars], a: Char) => {
      acc.flatMap { xs: IndexedChars =>   // xs is the accompanying value that the accumulator wraps
        getState.flatMap { n: Int =>      // "Given a state, return that state." (n is fed in via runS)
          setState(n + 1).map(ignored =>  // "I don't care which state you give me, the next one is n+1"
            ((n, a) +: xs)                // the accompanying value is now the old plus the (n,a) tuple.
          )
        }
      }
    })

Which is a bit more understandable.