Saturday, February 27, 2016

Locality Sensitive Hashing in Tweets


Spark's DIMSUM algorithm might have a limit to how much data it can process but the code for it came from Twitter who process gazillions of tweets everyday. How do you apply it to huge amounts of data?

Well, one way is to break the data down into smaller chunks and one way to do that without losing its meaningfulness is to use locality sensitive hashing. The best (but rather verbose) description I've found of LSH is here, a chapter from a free book on data mining.

(Note: Ullman uses the term "characteristic matrix" in the document differently from how perhaps a mathematician would use it. A characteristic matrix in maths is λ I - A where A is an n x n matrix, I is the n x n identity matrix and λ is an eigenvalue. Solving the characteristic equation, |λ I - A| = 0, gives you at most n real roots and allows you to calculate the eigenvectors, v, by substituting the discovered eigenvectors into the homogeneous system of equations, (λ I - A) v = 0 ).

To summarise, the key points follow.

Jaccard Similarity

First, we define something called the Jaccard similarity between two sets. This is simply:

s  =  number of shared elements between the sets
number of total elements in the sets

Next, we convert the two sets into a matrix where the columns represent the set and the rows represent the elements in the set. The cell value where they meet simply represents whether that term is in that set (a value of 1) or not (a value of 0).

Minhashing

Next, we define a function that gives us the row of the first element in a given column that have value 1 (going from top to bottom). We call this minhashing. Typically, we've reordered the rows of the original matrix randomly.

The Minhash and Jaccard relationship

The chances that two columns have the same minhash result is actually the Jaccard similarity. The proof of this follows.

Given two columns, say they both have 1 in x rows; only one of them has a non-zero value in y columns; and we ignore the rest. If we pick any non-zero row at random then the probability that both sets have 1 is unsurprisingly:

 x 
x + y

which is actually exactly the same as the Jaccard similarity, s.

Minhash Signatures

Now, let's make another matrix with a column per set (as before) but this time each row represents one of n random functions. The function simply takes the row index and returns a deterministic value. The cell values are all set to positive infinity but that will change.

For each function, we use the index of the row in the original matrix to generate a result. For each set in the original matrix in that row that has a non-zero value, we place the function's value in the corresponding cell in our new matrix if it less than what is already there.

If there are m sets, this results in an n x m matrix that is our (much reduced) signature.

Banding

Now let's break our signature matrix into b bands of r rows each.

The probability that all r rows within a band have the same value is sr.

The probability that at least one row is not the same is (1 - sr).

The probability that all b bands have at least one row that is not the same is (1 - sr)b.

Therefore, the probability that at least one of the b bands has all r rows the same is 1 - (1 - sr)b.

Which is the probability that we have a pair of sets that may be similar.

The nice things about it is that this describes an S-shaped curve no matter what the values for s, r and b. This is good as the probability that either a point on it is clearly probable or clearly improbable is maximized.

For b = 10, the probabilities for r = [2, 4, 6, 8] look like this:



Distance functions

There are many types of distance measurements. Euclidean distance is the one we're taught most in school but there are others. The only rule is that for any distance function, d, that gives the distance between x and y, the following must hold:
d(x, y) >= 0 
d(x, y) = 0 iff x == y 
d(x, y) = d(y, x) [they're symmetric] 
d(x, y) <= d(x, z) + d(z, y)  [the triangle inequality]
Nothing too clever there. They're just common sense if you were navigating around town. The thing to bear in mind is that they apply to more than just the 3D space in which we live.

Family of functions

We define a family of functions as a collection of functions that say "yes, make x and y a candidate
pair,” or "no, do not make x and y a candidate pair unless some other function concludes we should do so.” Minhashing is one such family.

Furthermore, we define the term (d1 , d2 , p1 , p2)-sensitive for such families to describe the curve they generate. We define
d1 < d2 
p1 > p2
where d1 and d2 are the results of a given distance function d(x, y); p1 and p2 are minimum probabilities that two functions agree when d(x, y) < d1 and the maximum probability they agree when d(x, y) > d2 respectively.

This gives us a nice short-hand way to describe a complicated graph.

Amplifying the effect

It is desirable to maximize the steepness of the S-curve to reduce the number of false negatives and false positives. We can do this by applying a second family of functions to the first. We could demand all the bands agree for 2 points (x, y), that is an AND operator. Or we could demand at least one of the bands agree, that is an OR operator. Each will change the curve in a different way depending on what we want.

This is analogous to the banding (see above) and the maths for this is the same only this time instead of s, we're dealing with p, the probability two functions give the same answer for a row.

The downside to this is we increase the demand on our CPUs. In large datasets, a small increase in CPU is magnified proportional to the amount of data we have to run the algorithm over so it's worth bearing this in mind.

Code

I stole the Python code from here to generate the graphs and slightly modified it to below:

from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
import matplotlib.pyplot as plt
import numpy as np

# from http://matplotlib.org/mpl_toolkits/mplot3d/tutorial.html#line-plots

fig = plt.figure()
ax = fig.gca(projection='3d')


def cc(arg):
    return colorConverter.to_rgba(arg, alpha=0.6)


def p(s, r):
    b = 10
    return 1 - ((1 - s**r) ** b)


xs = np.arange(0, 1, 0.04)
verts = []
zs = [2.0, 4.0, 6.0, 8.0]
for z in zs:
    ys = map(lambda x: p(x, z), xs)
    ys[0], ys[-1] = 0, 0
    verts.append(list(zip(xs, ys)))

poly = PolyCollection(verts, facecolors=[cc('r'), cc('g'), cc('b'),
                                         cc('y')])
poly.set_alpha(0.7)
ax.add_collection3d(poly, zs=zs, zdir='y')

ax.set_xlabel('s')
ax.set_xlim3d(0, 1)
ax.set_ylabel('r')
ax.set_ylim3d(min(zs), max(zs))
ax.set_zlabel('p')
ax.set_zlim3d(0, 1)

plt.show()


Monday, February 15, 2016

A Case for Scala


Here's three interesting things I came across this week with Scala's case idiom.

Backticks

The first is this snippet:

    val b = "b"
    "a" match {
      case b => println("'a' matches b. WTF?")
      case _ => println("no match")
    }

which prints the first line ("... WTF?"). It's a simple gotcha. The b in the case shadows the outside b that equals the string "b". You need to use backticks to make Scala use the outer b (see here) as in:

    "a" match {
      case `b` => println("'a' matches b. WTF?")
      case _   => println("no match") // <-- printed
    }

Regex

The case keyword can also be used in conjunction with regular expressions, for example:

    val hello = "hello.*".r
    val goodbye = "goodbye.*".r

    "goodbye, cruel world" match {
      case hello()   => println("g'day!")
      case goodbye() => println("thanks for all the fish") // this is printed
      case _         => println("no match")
    }

(see here for further information)

Implicits

The third case for the case keyword is no case at all like this code from Elastic4S where the Type Class Pattern is employed. In a nutshell, there is an implementation in the implicit ether that can fulfil our request. In the Elastic4S (1.5.1) code, it looks like this:

  def execute[T, R](t: T)(implicit executable: Executable[T, R]): Future[R] = executable(client, t)

where the executable in question is any Executable that matches the type of our argument of type T. It's a little harder to read (where in the ether is this Executable?) but is much more extensible.


Saturday, February 6, 2016

Dimsum fills you up


DIMSUM in an algorithm (described here) that efficiently finds similar columns in a matrix. You get it for free in Spark in the RowMatrix.columnSimilarities method.

The problem is basically taking each row and multiplying all combination of values together. Given a matrix with m rows, and n columns, this is O(mn2) problem. But if the matrix is sparse, with a maximum of L non-zero values per row, the problem becomes O(mL2) complex. DIMSUM promises to improve this efficiency even more (with a small loss in accuracy).

The headache for us was that our matrix was huge - 100 million by 100 million. This was pushing the envelope as it appears it is not best suited for this [see here for this comment: "Column based similarities work well if the columns are mild (10K, 100K, we actually scaled it to 1.5M columns but it really stress tests the shuffle and it needs to tune the shuffle parameters)"]

What we were seeing in the driver logs looked like this:

16/01/28 11:05:56 ERROR TaskSchedulerImpl: Lost executor 3 on ip-172-30-0-65.eu-west-1.compute.internal: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
16/01/28 11:05:56 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 44), so marking it as still running
16/01/28 11:05:56 WARN TaskSetManager: Lost task 72.0 in stage 7.0 (TID 6100, ip-172-30-0-65.eu-west-1.compute.internal): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
.
.

It seemed like the workers were dying with OutOfMemoryErrors. This lead to a scramble to tune the cluster and the usual suspects were called in.

The Spark documentation on memory tuning was also a good read but didn't help. I was using Kryo for compressing objects; I reduced spark.memory.fraction to 25% from the 75% default to maximize the spave "reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records." All to no avail.

This StackOverflow post was informative. I was using Spark 1.6 so didn't need to worry about spark.storage.memoryFraction; I wasn't using broadcast variables; I was using thousands of partitions; and the "task serialized as XXX bytes" mentioned in the logs indicated that XXX was small.

So, I used about as much of the memory the box could give me (55gb out of 60gb - I was also running Hadoop and the kernel needs some memory too). And running jmap on the worker boxes gave an output like this:

 num     #instances         #bytes  class name
----------------------------------------------
   1:            72    42400003008  [D
   2:         20222      174418080  [B
   3:       2070414       99379872  scala.Tuple2$mcID$sp
   4:        403387       69434872  [C
   5:        382592       36728832  io.netty.channel.ChannelOutboundBuffer$Entry
   6:        769266       30770640  io.netty.util.Recycler$DefaultHandle
   7:        245337       29440440  io.netty.buffer.PooledHeapByteBuf

That's 39.5gb of arrays of doubles! Huh?

Looking more closely at the code in Spark for calculating the column similarities, we see:

  def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {
    val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)(
      (aggregator, data) => aggregator.add(data),
      (aggregator1, aggregator2) => aggregator1.merge(aggregator2))
    updateNumRows(summary.count)
    summary
  }

which basically says call

      (aggregator, data) => aggregator.add(data),

over each partition and the aggregate them altogether with:

      (aggregator1, aggregator2) => aggregator1.merge(aggregator2))

what is this MultivariateOnlineSummarizer class? Well, it contains seven arrays of doubles. Hmm. And what's this in the add method (that is called on each row in a partition)?

  private[spark] def add(instance: Vector, weight: Double): this.type = {
.
.
      n = instance.size

      currMean = Array.ofDim[Double](n)
      currM2n = Array.ofDim[Double](n)
      currM2 = Array.ofDim[Double](n)
      currL1 = Array.ofDim[Double](n)
      nnz = Array.ofDim[Double](n)
      currMax = Array.fill[Double](n)(Double.MinValue)
      currMin = Array.fill[Double](n)(Double.MaxValue)
.
.

Yikes, Scooby, this is 108 doubles per array per partition as DIMSUM calculates summaries used in the next part of the algorithm. Well, that's going to cause problems (note: it doesn't matter how many rows there are, this is a columns-only limitation).

My first attempt at a solution involved now reducing the number of partitions with RDD.coalesce. This is inexpensive as it just logically groups the 2000 partitions into 16 (I choose 16 as I have 4 boxes and each box could just about hold 4 MultivariateOnlineSummarizers in memory at once).

This helped the job to progress further but then Kryo was complaining that it didn't have enough buffer space.

16/02/02 13:52:28 WARN TaskSetManager: Lost task 3.3 in stage 4.0 (TID 4050, ip-172-30-0-64.eu-west-1.compute.internal): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 6, required: 8
Serialization trace:
currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer). To avoid this, increase spark.kryoserializer.buffer.max value.

Following its advice, this was quickly fixed with:

sparkConf.setExecutorEnv("spark.kryoserializer.buffer.max", "20G")

only for me to run into:

16/02/02 14:05:23 WARN TaskSetManager: Lost task 1.0 in stage 4.1 (TID 6149, ip-172-30-0-65.eu-west-1.compute.internal): FetchFailed(BlockManagerId(0, ip-172-30-0-63.eu-west-1.compute.internal, 44256), shuffleId=2, mapId=7, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: Too large frame: 3251949143
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
.
.
Caused by: java.lang.IllegalArgumentException: Too large frame: 3251949143
        at org.spark-project.guava.base.Preconditions.checkArgument(Preconditions.java:119)

        at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:134)
.
.

Well now, this seems the end of the road as the code in TransportFrameDecoder is:

Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);

(see here for more information on Spark limits). Indeed the app in the Spark GUI looks like this:



We can see a job that is in the process of retrying having failed already. The interesting things to note are that 16 out of 16 tasks of stage 3 have succeeded - that is the first part of treeAggregate. Our hack using coalesce worked! However, the second part of treeAggregate fails, that is none of the 4 tasks (corresponding to my 4 boxes trying to aggregate their individual results) in stage 4 passed.

Workaround

Well, there isn't one that I am aware of, at least as far as DIMSUM is concerned. However, since my matrix is very sparse, coding up a brute-force routine actually ran in about 45 minutes so maybe I don't need the efficiency DIMSUM promises.