Saturday, February 27, 2016

Locality Sensitive Hashing in Tweets


Spark's DIMSUM algorithm might have a limit to how much data it can process but the code for it came from Twitter who process gazillions of tweets everyday. How do you apply it to huge amounts of data?

Well, one way is to break the data down into smaller chunks and one way to do that without losing its meaningfulness is to use locality sensitive hashing. The best (but rather verbose) description I've found of LSH is here, a chapter from a free book on data mining.

(Note: Ullman uses the term "characteristic matrix" in the document differently from how perhaps a mathematician would use it. A characteristic matrix in maths is λ I - A where A is an n x n matrix, I is the n x n identity matrix and λ is an eigenvalue. Solving the characteristic equation, |λ I - A| = 0, gives you at most n real roots and allows you to calculate the eigenvectors, v, by substituting the discovered eigenvectors into the homogeneous system of equations, (λ I - A) v = 0 ).

To summarise, the key points follow.

Jaccard Similarity

First, we define something called the Jaccard similarity between two sets. This is simply:

s  =  number of shared elements between the sets
number of total elements in the sets

Next, we convert the two sets into a matrix where the columns represent the set and the rows represent the elements in the set. The cell value where they meet simply represents whether that term is in that set (a value of 1) or not (a value of 0).

Minhashing

Next, we define a function that gives us the row of the first element in a given column that have value 1 (going from top to bottom). We call this minhashing. Typically, we've reordered the rows of the original matrix randomly.

The Minhash and Jaccard relationship

The chances that two columns have the same minhash result is actually the Jaccard similarity. The proof of this follows.

Given two columns, say they both have 1 in x rows; only one of them has a non-zero value in y columns; and we ignore the rest. If we pick any non-zero row at random then the probability that both sets have 1 is unsurprisingly:

 x 
x + y

which is actually exactly the same as the Jaccard similarity, s.

Minhash Signatures

Now, let's make another matrix with a column per set (as before) but this time each row represents one of n random functions. The function simply takes the row index and returns a deterministic value. The cell values are all set to positive infinity but that will change.

For each function, we use the index of the row in the original matrix to generate a result. For each set in the original matrix in that row that has a non-zero value, we place the function's value in the corresponding cell in our new matrix if it less than what is already there.

If there are m sets, this results in an n x m matrix that is our (much reduced) signature.

Banding

Now let's break our signature matrix into b bands of r rows each.

The probability that all r rows within a band have the same value is sr.

The probability that at least one row is not the same is (1 - sr).

The probability that all b bands have at least one row that is not the same is (1 - sr)b.

Therefore, the probability that at least one of the b bands has all r rows the same is 1 - (1 - sr)b.

Which is the probability that we have a pair of sets that may be similar.

The nice things about it is that this describes an S-shaped curve no matter what the values for s, r and b. This is good as the probability that either a point on it is clearly probable or clearly improbable is maximized.

For b = 10, the probabilities for r = [2, 4, 6, 8] look like this:



Distance functions

There are many types of distance measurements. Euclidean distance is the one we're taught most in school but there are others. The only rule is that for any distance function, d, that gives the distance between x and y, the following must hold:
d(x, y) >= 0 
d(x, y) = 0 iff x == y 
d(x, y) = d(y, x) [they're symmetric] 
d(x, y) <= d(x, z) + d(z, y)  [the triangle inequality]
Nothing too clever there. They're just common sense if you were navigating around town. The thing to bear in mind is that they apply to more than just the 3D space in which we live.

Family of functions

We define a family of functions as a collection of functions that say "yes, make x and y a candidate
pair,” or "no, do not make x and y a candidate pair unless some other function concludes we should do so.” Minhashing is one such family.

Furthermore, we define the term (d1 , d2 , p1 , p2)-sensitive for such families to describe the curve they generate. We define
d1 < d2 
p1 > p2
where d1 and d2 are the results of a given distance function d(x, y); p1 and p2 are minimum probabilities that two functions agree when d(x, y) < d1 and the maximum probability they agree when d(x, y) > d2 respectively.

This gives us a nice short-hand way to describe a complicated graph.

Amplifying the effect

It is desirable to maximize the steepness of the S-curve to reduce the number of false negatives and false positives. We can do this by applying a second family of functions to the first. We could demand all the bands agree for 2 points (x, y), that is an AND operator. Or we could demand at least one of the bands agree, that is an OR operator. Each will change the curve in a different way depending on what we want.

This is analogous to the banding (see above) and the maths for this is the same only this time instead of s, we're dealing with p, the probability two functions give the same answer for a row.

The downside to this is we increase the demand on our CPUs. In large datasets, a small increase in CPU is magnified proportional to the amount of data we have to run the algorithm over so it's worth bearing this in mind.

Code

I stole the Python code from here to generate the graphs and slightly modified it to below:

from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
import matplotlib.pyplot as plt
import numpy as np

# from http://matplotlib.org/mpl_toolkits/mplot3d/tutorial.html#line-plots

fig = plt.figure()
ax = fig.gca(projection='3d')


def cc(arg):
    return colorConverter.to_rgba(arg, alpha=0.6)


def p(s, r):
    b = 10
    return 1 - ((1 - s**r) ** b)


xs = np.arange(0, 1, 0.04)
verts = []
zs = [2.0, 4.0, 6.0, 8.0]
for z in zs:
    ys = map(lambda x: p(x, z), xs)
    ys[0], ys[-1] = 0, 0
    verts.append(list(zip(xs, ys)))

poly = PolyCollection(verts, facecolors=[cc('r'), cc('g'), cc('b'),
                                         cc('y')])
poly.set_alpha(0.7)
ax.add_collection3d(poly, zs=zs, zdir='y')

ax.set_xlabel('s')
ax.set_xlim3d(0, 1)
ax.set_ylabel('r')
ax.set_ylim3d(min(zs), max(zs))
ax.set_zlabel('p')
ax.set_zlim3d(0, 1)

plt.show()


Monday, February 15, 2016

A Case for Scala


Here's three interesting things I came across this week with Scala's case idiom.

Backticks

The first is this snippet:

    val b = "b"
    "a" match {
      case b => println("'a' matches b. WTF?")
      case _ => println("no match")
    }

which prints the first line ("... WTF?"). It's a simple gotcha. The b in the case shadows the outside b that equals the string "b". You need to use backticks to make Scala use the outer b (see here) as in:

    "a" match {
      case `b` => println("'a' matches b. WTF?")
      case _   => println("no match") // <-- printed
    }

Regex

The case keyword can also be used in conjunction with regular expressions, for example:

    val hello = "hello.*".r
    val goodbye = "goodbye.*".r

    "goodbye, cruel world" match {
      case hello()   => println("g'day!")
      case goodbye() => println("thanks for all the fish") // this is printed
      case _         => println("no match")
    }

(see here for further information)

Implicits

The third case for the case keyword is no case at all like this code from Elastic4S where the Type Class Pattern is employed. In a nutshell, there is an implementation in the implicit ether that can fulfil our request. In the Elastic4S (1.5.1) code, it looks like this:

  def execute[T, R](t: T)(implicit executable: Executable[T, R]): Future[R] = executable(client, t)

where the executable in question is any Executable that matches the type of our argument of type T. It's a little harder to read (where in the ether is this Executable?) but is much more extensible.


Saturday, February 6, 2016

Dimsum fills you up


DIMSUM in an algorithm (described here) that efficiently finds similar columns in a matrix. You get it for free in Spark in the RowMatrix.columnSimilarities method.

The problem is basically taking each row and multiplying all combination of values together. Given a matrix with m rows, and n columns, this is O(mn2) problem. But if the matrix is sparse, with a maximum of L non-zero values per row, the problem becomes O(mL2) complex. DIMSUM promises to improve this efficiency even more (with a small loss in accuracy).

The headache for us was that our matrix was huge - 100 million by 100 million. This was pushing the envelope as it appears it is not best suited for this [see here for this comment: "Column based similarities work well if the columns are mild (10K, 100K, we actually scaled it to 1.5M columns but it really stress tests the shuffle and it needs to tune the shuffle parameters)"]

What we were seeing in the driver logs looked like this:

16/01/28 11:05:56 ERROR TaskSchedulerImpl: Lost executor 3 on ip-172-30-0-65.eu-west-1.compute.internal: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
16/01/28 11:05:56 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 44), so marking it as still running
16/01/28 11:05:56 WARN TaskSetManager: Lost task 72.0 in stage 7.0 (TID 6100, ip-172-30-0-65.eu-west-1.compute.internal): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
.
.

It seemed like the workers were dying with OutOfMemoryErrors. This lead to a scramble to tune the cluster and the usual suspects were called in.

The Spark documentation on memory tuning was also a good read but didn't help. I was using Kryo for compressing objects; I reduced spark.memory.fraction to 25% from the 75% default to maximize the spave "reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records." All to no avail.

This StackOverflow post was informative. I was using Spark 1.6 so didn't need to worry about spark.storage.memoryFraction; I wasn't using broadcast variables; I was using thousands of partitions; and the "task serialized as XXX bytes" mentioned in the logs indicated that XXX was small.

So, I used about as much of the memory the box could give me (55gb out of 60gb - I was also running Hadoop and the kernel needs some memory too). And running jmap on the worker boxes gave an output like this:

 num     #instances         #bytes  class name
----------------------------------------------
   1:            72    42400003008  [D
   2:         20222      174418080  [B
   3:       2070414       99379872  scala.Tuple2$mcID$sp
   4:        403387       69434872  [C
   5:        382592       36728832  io.netty.channel.ChannelOutboundBuffer$Entry
   6:        769266       30770640  io.netty.util.Recycler$DefaultHandle
   7:        245337       29440440  io.netty.buffer.PooledHeapByteBuf

That's 39.5gb of arrays of doubles! Huh?

Looking more closely at the code in Spark for calculating the column similarities, we see:

  def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {
    val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)(
      (aggregator, data) => aggregator.add(data),
      (aggregator1, aggregator2) => aggregator1.merge(aggregator2))
    updateNumRows(summary.count)
    summary
  }

which basically says call

      (aggregator, data) => aggregator.add(data),

over each partition and the aggregate them altogether with:

      (aggregator1, aggregator2) => aggregator1.merge(aggregator2))

what is this MultivariateOnlineSummarizer class? Well, it contains seven arrays of doubles. Hmm. And what's this in the add method (that is called on each row in a partition)?

  private[spark] def add(instance: Vector, weight: Double): this.type = {
.
.
      n = instance.size

      currMean = Array.ofDim[Double](n)
      currM2n = Array.ofDim[Double](n)
      currM2 = Array.ofDim[Double](n)
      currL1 = Array.ofDim[Double](n)
      nnz = Array.ofDim[Double](n)
      currMax = Array.fill[Double](n)(Double.MinValue)
      currMin = Array.fill[Double](n)(Double.MaxValue)
.
.

Yikes, Scooby, this is 108 doubles per array per partition as DIMSUM calculates summaries used in the next part of the algorithm. Well, that's going to cause problems (note: it doesn't matter how many rows there are, this is a columns-only limitation).

My first attempt at a solution involved now reducing the number of partitions with RDD.coalesce. This is inexpensive as it just logically groups the 2000 partitions into 16 (I choose 16 as I have 4 boxes and each box could just about hold 4 MultivariateOnlineSummarizers in memory at once).

This helped the job to progress further but then Kryo was complaining that it didn't have enough buffer space.

16/02/02 13:52:28 WARN TaskSetManager: Lost task 3.3 in stage 4.0 (TID 4050, ip-172-30-0-64.eu-west-1.compute.internal): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 6, required: 8
Serialization trace:
currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer). To avoid this, increase spark.kryoserializer.buffer.max value.

Following its advice, this was quickly fixed with:

sparkConf.setExecutorEnv("spark.kryoserializer.buffer.max", "20G")

only for me to run into:

16/02/02 14:05:23 WARN TaskSetManager: Lost task 1.0 in stage 4.1 (TID 6149, ip-172-30-0-65.eu-west-1.compute.internal): FetchFailed(BlockManagerId(0, ip-172-30-0-63.eu-west-1.compute.internal, 44256), shuffleId=2, mapId=7, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: Too large frame: 3251949143
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
.
.
Caused by: java.lang.IllegalArgumentException: Too large frame: 3251949143
        at org.spark-project.guava.base.Preconditions.checkArgument(Preconditions.java:119)

        at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:134)
.
.

Well now, this seems the end of the road as the code in TransportFrameDecoder is:

Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);

(see here for more information on Spark limits). Indeed the app in the Spark GUI looks like this:



We can see a job that is in the process of retrying having failed already. The interesting things to note are that 16 out of 16 tasks of stage 3 have succeeded - that is the first part of treeAggregate. Our hack using coalesce worked! However, the second part of treeAggregate fails, that is none of the 4 tasks (corresponding to my 4 boxes trying to aggregate their individual results) in stage 4 passed.

Workaround

Well, there isn't one that I am aware of, at least as far as DIMSUM is concerned. However, since my matrix is very sparse, coding up a brute-force routine actually ran in about 45 minutes so maybe I don't need the efficiency DIMSUM promises.


Saturday, January 30, 2016

Cloud Atlas


Note on setting up Spark on AWS

YMMV, but these are some things that were useful to me.

Set up your Amazon account per the instructions page. I put my key in:

~/.ssh/MyTestAwsIreland.id_rsa

I downloaded the latest Spark and ran this from the top level directory:

./spark-ec2 -k MyTestAwsIreland -i ~/.ssh/MyTestAwsIreland.id_rsa -s 4 --instance-type=c4.8xlarge --region=eu-west-1 --vpc-id=MY_VPC_ID --subnet-id=MY_SUBNET_ID --zone=eu-west-1a launch MY_CLUSTER_NAME

(your region and zone may be different. If you see errors, this page may help).

This starts a fairly beefy cluster with one master and 4 slaves using the same version of Spark as you downloaded. Unfortunately, there wasn't much disk space (I was getting "no space left on device" from Spark jobs) so follow the instructions here and if there are problems, this page will almost certainly solve them.

Also, I was having trouble connecting. You must set up a public DNS if you want your local Spark driver to talk to the cluster (see here for more information).

Running Hadoop was/is proving more complicated. I could not use port 9000 as it was being blocked for some reason (my local client socket was left in SYNC_SENT state indicating a firewall issue) so I changed it to 8020. This remains a puzzle.

Also, Spark clients initially talk to the NameNode but then start talking direct to the DataNodes. AWS instances have a public domain name/IP combo and a private domain name/IP combo. Unfortunately, the NameNode was sending the private IP address. This link forces it to send the domain name but at the time of writing, it's only the private domain name. Using the hostname command on the boxes has not solved the problem.

Anyway, it will also help to get rsync set up so you can develop on your laptop and then synch your code with something as simple as

rsync -avz -e "ssh  -i ~/.ssh/MyTestAwsIreland.id_rsa -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"  --exclude '.git' --exclude '.idea'  --progress LOCAL_PROJECT_DIRECTOR  root@YOUR_MACHINE_NAME:/REMOTE_PROJECT_DIRECTORY

Finally, a simple script to run on all your slave boxes during a job is:

while ( true ) ; do { jstat -gc `jps | grep CoarseGrainedExecutorBackend | awk '{print $1}'` | awk  '{print $8 "/" $7 " " $14}' 2>/dev/null ; uptime ; vmstat 1 2 ; echo ; sleep 10 ; } done

which monitors the JVM giving its old generation heap usage over its capacity, the time taken for full GCs plus the load average and various system stats (from vmstat).

Addendum: this seems to install a Spark instance with a really old Hadoop library (see SPARK_HOME/lib/spark-assembly-1.6.0-hadoop1.2.1.jar). I re-installed a better version and had it talking to Hadoop nicely but not before I wasted time trying to solve this issue.

Sunday, January 17, 2016

The Maths of LSA


At the heart of Latent Semantic Analysis is a mathematical pattern called Singular Value Decomposition (SVD). This is an appropriate technique when you have sparse data in many dimensions. It reduces it to smaller dimensions with (hopefully) little loss of accuracy.

First, a little mathematical refresher.

Eigenvalues and Eigenvectors

An eigenvector is a vector that when multiplied by a matrix points in the same direction. It's length may have changed by (a non-zero) factor called the eigenvalue.

Only square matrices can have eigenvalues and eigenvectors. Anything non-square will change the dimension of the vector so the result would never be a multiple of itself. SVD is a hack to work with non-square matrices.

What's more, not all matrices will have real valued eigen-vectors/values. Let's say the matrix is a rotation. No vector (apart from the 0-vector) will point in the same direction after all rotations. In Python, this is a 45-degree rotation.

import numpy as np


def no_complex_elements(vec):
    np.all(map(lambda arr: not np.iscomplex(arr.flatten), vec))


def print_eigenvectors_and_eigenvalues(matrix):
    eigenvalues, eigenvectors = np.linalg.eig(rotation)
    print "real eigenvalues \n", filter(lambda scalar: not isinstance(scalar, complex), eigenvalues)
    print "real eigenvectors\n", filter(lambda vector: no_complex_elements(vector), eigenvectors)


if __name__ == "__main__":
    rotation = np.matrix('0.707106781  -0.707106781  0;'
                         '0.707106781  0.707106781   0;'
                         '0            0             1')
    print_eigenvectors_and_eigenvalues(rotation)

and it output's an empty list.

The trolley-line-location problem

In "Coding the Matrix", Philip Klein compares SVD to finding the optimal path for a straight tram line to run through a city. I'll skip most the proofs (they're in the book) but the gist goes like this:

Imagine the tram line must go through the city centre (the origin) while getting as close as possible to fixed land marks located with vectors ai.

Make each ai a row in a vector A.

Now, each point ai can be expressed in terms of two vectors, one parallel to the tram line, ai||v, and one perpendicular to it, ai¬v (notice that we're choosing a new co-ordinate system) and that since
ai = ai¬v + ai||v
then, squaring:
||ai||2 = ||ai¬v||2 + ||ai||v ||2
and re-arranging:
||ai¬v||= ||ai||2 - ||ai||v ||2

This, of course, is our distance from the tram line squared. If we want to minimize this value for all landmarks, we can express the problem as finding the minimum of:
  

i 

(||ai||2 - ||ai||v ||2)

which, when you again re-arrange, will give you:
||A||F2  - ||Av||2
where the F indicates the Frobenius Norm (nothing too scary, just a generalization of a vector multiplied by itself) and where the second term is just us cleaning up all the other terms:
  

i 

<aiv>2

where <aiv>is just the dot-product of ai and the unit vector, v which is of course ai||v.

Seems a lot of work. Don't worry, it's all been high-school maths so far.

Anyway, it's this second term that we want to maximize. As the first term is fixed, maximizing the second will find the overall minimum for the whole equation. We won't worry too much how we do this - there are numerical techniques for doing it - this post is just to give the flavour.

Having chosen a suitable unit vector to maximize the second term, we'll call it v1 to remind us that this optimizing of distances between landmarks and the tram line can be viewed as a 1-dimensional approximation to covering all the landmarks as best we can using just a straight line. Perhaps we want to place a tram-stop at the point closest to each landmark. In which case, we could build a set of vectors similar to A only this time each row is a vector to a tram stop. Let's call this matrix Ã. Furthermore, we call ||Av1|| the first singular value (or σ1) and v1 the first right singular vector.

Then, the error in our approximation is A - Ã and we also know that this represents the distance from the tram line to landmark in each row in resultant matrix. Well, we've already calculated this, so:
||A - Ã||F2  =  ||A||F2  - ||Av||2  = ||A||F2  σ12
Note that the positions of these tram stops can also be calculated as the projection of ai onto v1. So, each row of Ã can be expressed as:
<ai, v1v1
So, taking this equation and re-expressing
à = A vv1T  σuv1T 
where we define σu1 = A v1

This is starting to look like our familiar equation in SVD. So, it only remains to note that this "trolley-line" example is a specific case. We could expand it to more than a 2-D map.

We do this by finding the unit vector v2 that is orthogonal to v1 and maximises ||Av|| and again for v3v4, etc.

It doesn't take a great deal of effort to now show that σ1σ2σ3 etc can be turned into a diagonal matrix and that would give us the SVD equation.

It's also not too hard to see that the values of this diagonal matrix are in descending order since at each stage we found the maximum before moving on to the next step.

Thursday, December 31, 2015

Latent Semantic Analysis with Spark


I'm reading the excellent Advanced Analytics with Spark as I need to learn more about natural language processing (NLP).

The method in the book is called Latent Semantic Analysis. Basically, this is building a vector for each document that captures the relevant words and their significance. More specifically, all vectors have the same size and the values are roughly speaking the fraction of a given term in the document multiplied by the number of occurrences of this term across all documents (with some logs thrown in to dampen outliers). A given index in each vector represents the same term over all documents.

The code for the book can be retrieved with:

git clone https://github.com/sryza/aas.git

The first step is a process of lemmatization which removes stop words and aggregates related words. For this, an NLP library from Stanford University is used. The code looks like this:

    val lemmatized = plainText.mapPartitions(iter => {
      val pipeline = createNLPPipeline()
.
.

Interestingly, we "use mapPartitions so that we only initialize the NLP pipeline once per partition instead of once per document". [1]

(The variable lemmatized is an RDD of documents where the documents are just a Seq of Strings that have had the stop words removed and related words aggregated.)

By mapping over lemmatized, we make an RDD of Map[String, Int] that represent the term count per document and we call this docTermFreqs. We combine all these for each partition of documents and merge them all together at the end. "When the records being aggregated and the result object have the same type (eg, in sum), reduce is useful, but when the types differ, as they do here, aggregate  is a more powerful alternative" [1]

Unfortunately, this approach can lead to OutOfMemoryErrors. So, an alternative is to find the distribution of terms over the documents with this:

    val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)

(where 15 is our number of partitions). We also limit the number of terms:

    docFreqs.top(numTerms)(ordering)

where numTerms is arbitrary but defaults to 50 000. From this, we have enough information to calculate the Inverse Document Frequencies:

    docFreqs.map{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap

The use of log dampens the effect of outliers.

In turn, with this we can map over all docTermFreqs creating a sparse vector of all the terms and their scores as we go.

The Maths

So much for the Scala, now for the mathematics. LSA depends on a trick called Singular Value Decomposition. This is a very general technique that is used in many diverse maths problems. A good demonstration of it (using Python) is available here. A simple(ish), instuitive explanation of it is here.

One use of SVD is to reduce the dimensions of the problem to something more manageable. One consequence of such a reduction is a loss of accuracy but if this loss is small, the approximation might be acceptable.

A consequence of SVD is that our matrix is broken down into three matrices, each with its own properties:

X = U D VT 

U is an N × p orthogonal matrix (UT U = Ip ) whose columns uj are called the left singular vectors;
V is a p × p orthogonal matrix (VT V = Ip ) with columns vj called the right singular vectors
D is a p × p diagonal matrix, with diagonal elements d1 ≥ d2 ≥ · · · ≥ dp ≥ 0 known as the singular values 
(taken from [2]). In English,

X is an (N x p) matrix that holds the observations. In general, N is the number of samples and p the number of features. For our purposes, N is the number of documents and p the number of terms. That is, the rows correspond to documents and the columns to terms. The values are:

(the calculation for a term from docFreqs seen earlier) x (frequency of this term in this document) / (total number of terms in this document).

V is a matrix "where each row corresponds to a term and each column corresponds to a concept. It defines a mapping between term space (the space where each point is an n-dimensional vector holding a weight for each term) and concept space (the space where each point is an n-dimensional vector holding a weight for each concept)."

U is a "matrix where each row corresponds to a document and each column corresponds to a concept. It defines a mapping between document space and concept space." [1]

There's a lot more to the mathematics and I'll post more soon but this is a good starting place to get more familiar with it.

Spark

Now, this is just a refresher on Spark's handling of matrices. Data handled as rows in Spark is easy. But if you want to do anything interesting with a matrix (multiply, take a transpose etc) this works less well. You need to deal with columns and the data in a column might be striped across many machines unlike rows.

At the moment (it's an experimental API), Spark has 4 implementations of DistributedMatrix each with slightly different abilities. The BlockMatrix implementation is currently the only one that allows one distributed matrix to be multiplied by another. As its name suggests, it uses a mathematical trick to better handle operations by breaking it into a block matrix.

Other implementations can be multiplied by a Matrix object (dense or sparse) but this appears to be local to the node on which the code runs.


[1] Advanced Analytics with Spark
[2] Elements of Statistical Learning (free download)

Thursday, December 10, 2015

The Magnet Pattern


...is a variant on the Type Class Pattern where Scala copies a Haskell idiom. A good article is here. The idea is that there is just one method that takes a variety of objects with the same super-trait depending on what parameters were used in calling that method. This replaces having many overloaded methods.

[Just to refresh one's memory: the Type Class pattern is a method of ad hoc polymorphism but it differs from the Pimp My Library pattern. A good comparison of the two can be found here. Essentially, the Pimp My Library pattern appears to add methods to classes that were not there when they were first written. The key thing about the Type Class pattern is that it asks for a witness to evidence that something is possible for a certain type. Think about Scala's TraversableOnce.sum method that asks for evidence that the type it contains has an associated Numeric. In Pimp My Library, you're creating the functionality per type. In the Type Class pattern your providing the functionality for a more general function to do its work.]

Implicits
What is apparent immediately is that you really need to know about implicit rules. Here are some notes but a really good reference lies here.

Let's take a fairly pointless class:

class AContainer

Using the Type Class Pattern, we can add a method to it so:

  class PimpMe {
    def pimpedMethodOnClass: String = "pimpedMethodOnClass"
  }
  implicit def pimpedWithADefCreatingClass(aContainter: AContainer): PimpMe = new PimpMe

Now, we can call this method as if it were on the class itself:

    println(aContainer.pimpedMethodOnClass) // "pimpedWithADef"

More efficiently (see below) we could write:

  implicit class PimpMeEfficiently(val aContainer: AContainer) extends AnyVal {
    def pimpedMethodOnEfficientClass: String = "pimpedMethodOnEfficientClass"
  }
.
.
    println(aContainer.pimpedMethodOnEfficientClass) // "pimpedMethodOnEfficientClass"

The reason for AnyVal is "properly-defined user value classes provide a way to improve performance on user-defined types by avoiding object allocation at runtime, and by replacing virtual method invocations with static method invocations" (from the ScalaDocs).

The Magnet Pattern Simplified
A simplified magnet pattern implementation is a single method. Let's use a slightly more interesting magnet class whose toString method yields a different value. In real life, each subclass would have a more interesting implementation:

  class Magnet(string: String) {
    override def toString() = string
  }

And let's have a very trivial implementation of the method taking the magnet:

  def magnetMethod(magnet: Magnet): Unit = println(magnet)

Now, we can call this method with what look like different arguments, for example:

  implicit def fromStringAndInt(tuple: (String, Int)) = new Magnet(s"Implementation X: (String, Int) = (${tuple._1}, ${tuple._2})")
.
.
    magnetMethod("aString", 1)   // "Implementation X: (String, Int) = (aString, 1)" (thanks to fromStringAndInt)
    magnetMethod(("aString", 1)) // ditto

Note calling the magnet method with a tuple has the same effect as calling it with individual arguments.

Now, we could have a different implementation but let's be simple and just pretend there is different functionality. But we really do have a different call site:

  implicit def fromDouble(double: Double) = new Magnet(s"Implementation Y: double = $double")
.
.
    magnetMethod(1.1d) // "Implementation Y: double = 1.1" (thanks to fromDouble)

We're calling the same method with different arguments (cardinality as well as types)!