Thursday, May 26, 2016

From the Gradle to the Grave


After an annoying week in Classpath Hell, here are some things I've learned.

It all started with an upgrade to Elastic Search that was forced upon us (1.6 to 2.3.1). This caused all sorts of issues and a good deal of them to do with Guava. The Guava team have a very aggressive policy to deprecation: "Deprecated non-beta APIs will be removed two years after the release in which they are first deprecated. You must fix your references before this time. If you don't, any manner of breakage could result (you are not guaranteed a compilation error)." Compare that to the JDK which has never deleted any deprecated code.

Well, this is fine. I don't directly use Guava. Unfortunately, both ES and HBase do. Only my HBase (0.98.18-hadoop2) pulls in Guava version 12.0.1 as a transitive dependency and my Elastic Search (2.3.1) pulls in Guava 18.0. This leads to:

java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor
 at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
.
.

when clearly the method existed in my IDE.

The solution was to use shading. Shading not only relocates the contentious classes but updates all the call sites in the artefact. In Maven you use Shade plugin. Unfortunately, I'm using Gradle (which annoys me because I can't control the order of dependencies like I can in Maven). In Gradle you use Gradle Shadow where your build.gradle looks something like:

  apply plugin: 'com.github.johnrengelman.shadow'
.
.
  shadowJar {
    // this moves the classes and the reference to the classes
    relocate 'com.google.common', 'com.example.com.google.common'

    // where this is to address another shadowing problem (see here)
    zip64 true
    transform(ServiceFileTransformer) {
        path = 'META-INF/spring*'
    }
  }
.
.

NOTE! You must delete the uber jar before running this. I was confused for an hour or more wondering why it appeared to not be working. When I cleaned the project, the problem went away.

Checking that the classes had been renamed in the artefact was simple, checking the call sites a little harder. To make sure the ElasticSearch call site has been changed, I copy the offending class out of the JAR and run:

$ javap -p org/elasticsearch/threadpool/ThreadPool.class | grep google
private volatile com.example.com.google.common.collect.ImmutableMap<java.lang.String, org.elasticsearch.threadpool.ThreadPool$ExecutorHolder> executors;
private final com.example.com.google.common.collect.ImmutableMap<java.lang.String, org.elasticsearch.common.settings.Settings> defaultExecutorTypeSettings;

Yup, looks good.

All of this has wasted me a few days that could have been better spent analysing Big Data problems. Which made me think that the Paul Phillips talk ("We're doing it all wrong") that I watched at the weekend is only a small part of the story we face as developers. It's lovely to have a language that allows code to be "obviously without defects rather than without obvious defects" but most of my unproductive time seems to be handing builds and classpaths.


Saturday, May 21, 2016

Lazy Spark


You may want Spark to write to some data store (after all, there are some things that other technologies do better). Given an RDD distributed over many partitions on many nodes, how do you write the data efficiently?

You may choose to do something like this (where I've used println statements rather than explicit calls to the technologies of your choice):

    rdd foreachPartition { iterator =>
      println("create connection")
      iterator foreach { element =>
        println("insert")
      }
      println("close connection")
    }

Naturally, you need to get a connection inside the function (as the function will be serialized and run on other partitions that might be on other machines). 

Note that if the connection is referenced with a lazy val then each function will have its own connection (even on the same machine) and it will only be instantiated when it's run on its partition.

So, how do we know when to close the connection? A common answer is to just use connection pools.

Also note that the return type of foreachPartition is Unit so it's not too surprising that this is executed immediately since Unit hints at side effects. A quick look at the code of RDD confirms this.

Great. But what if you want to read data from some store and enhance the RDD? Now we're using mapPartitions that may very well be lazy. So, with similar code, the function might look like this:

    val mapped = rdd mapPartitions { iterator =>
      lazy val connection = getConnection()

      val mapped = iterator map { element =>
        if (connection.isOpen) select(element)
      }

      connection.close()
      mapped
    }

where I'm pretending to open and close some sort of pseudo-connection thus:

  val nSelects        = new AtomicInteger(0)
  val nConnections    = new AtomicInteger(0)

  class Connection {
    @volatile private var open = true
    def close(): Unit = { open = false }
    def isOpen(): Boolean = open
  }

  def getConnection(): Connection = {
    nConnections.incrementAndGet()
    new Connection
  }

  def select(element: Int): Int = nSelects.incrementAndGet()
  
Now, let's run RDD.count() so it should force even the lazy map to do something:

    println("Finished. Count  = " + mapped.count())
    println("nSelects         = " + nSelects.get() + ", nConnections = " + nConnections.get())

giving:

Finished. Count  = 10000
nSelects         = 0, nConnections = 0

What's this? We get the correct number of elements but no selects? What gives?

The issue is that the iterator given to the function passed to mapPartitions is lazy. This is nothing to do with Spark but is due to Scala's lazy collections (this iterator is actually a class of scala.collection.Iterator$$anon$11). If we ran this code with a real connection, we'd see that the it had closed by the time inner function wanted to do something with it. Odd, we might say since the call to close it comes later.

We could just force the mapping to run by calling size() on the resulting collection (which may or may not be efficient) but one solution suggested to me was:

    val batchSize = 100
    rdd mapPartitions { iterator =>
      val batched = iterator.grouped(batchSize)
      batched.flatMap { grouped =>
        val connection = getConnection()
        val seqMapped  = grouped.map(insert(_))
        connection.close()
        seqMapped
      }
    }

This creates (and closes) a connection for batchSize each of elements but this could be more efficient than the usual contention in a connection pool. Remember, small contention can make a big difference when dealing with hundreds of millions of elements.


Scala Curry


"All functions are curried in Haskell by default" that is, all functions in Haskell take only one argument. How does it do this? By currying.

Scala has currying built-in. To demonstrate, take this method that takes tow Ints and returns an Int:

  def aMethodWith2Args(x: Int, y: Int): Int = x + y

Let's lift it:

  val lifted: (Int, Int) => Int = aMethodWith2Args(_, _)

and curry it:

  val curried: Int => Int => Int = lifted.curried
 
So, we too now have a function that only takes one argument. It just returns another function that also takes only one argument. That's currying.

Now we can partially apply it

  val partialInt => Int = curried(1)
  println(partial(2)) // prints out '3'

Just like Haskell.

The math behind it can be found here. Using Haskell notation:

($) :: (a -> b) -> (a -> b)
// $ takes function, returns function, this is the same as ((a -> b), a) -> b, instead of passing all
// arguments at once, we can pass first argument to function, receive partially applied function, 
// and pass second argument to it.


Sunday, May 15, 2016

Linear Regression in Practice


Here's a sweet little challenge. Given a linear function, write some code that estimates what it is given only the data it produces.

Let's make the function a simple one, say f(x): Double -> Double, such that we can plot a straight line. A first draft might look like this in Scala:

First, generate a random function of the form α x + β

  def main(args: Array[String]): Unit = {
    val intercept               = Random.nextDouble()
    val rate                    = Random.nextDouble()
    val f                       = generateLinearFn(intercept, rate)
    val (estRate, estIntercept) = guessRateAndIntercept(f)
    println(s"real:     intercept = $intercept, rate = $rate")
    println(s"guessed:  intercept = $estIntercept, rate = $estRate")
  }

  def generateLinearFn(intercept: Double, rate: Double): (Double => Double) = { x =>
    intercept + (rate * x)
  }


Now, generate some values for x and run them through a similar shaped function where we have guessed the values for α and β. Then, we wiggle α and β and compare the results of this ersatz funtion with the results from the real function. If a wiggle makes the results better, keep the adjusted values of α and β.

  def guessRateAndIntercept(f: Double => Double): (Double, Double) = {
    val nIterations = 100
    val stepSize    = 1d / nIterations

    var rate        = Random.nextDouble()
    var intercept   = Random.nextDouble()

    val xs          = (1 to 100).map(x => x.toDouble / 100)

    for (i <- 0 to nIterations) {
      intercept = intercept + bestStep(stepSize, delta => generateLinearFn(intercept + delta, rate), f, xs)
      rate      = rate      + bestStep(stepSize, delta => generateLinearFn(intercept, rate  + delta), f, xs)

      println(s"intercept = $intercept, rate = $rate")
    }

    (rate, intercept)
  }

  def bestStep(stepSize: Double, df: Double => Double => Double, actualFn: Double => Double, xs: Seq[Double]): Double = {
    val errorToStep = Seq(stepSize, 0d, -stepSize).map(step => df(step) -> step).map { case (guessFn, step) =>
      (errorOver(actualFn, guessFn, xs), step)
    }
    errorToStep.minBy(_._1)._2
  }

Where the errors are measured simply by the sum of squared values between the results from our approximate function and the real thing.

  def errorOver(f: Double => Double, guessFn: Double => Double, data: Seq[Double]): Double =
    sumOfSquaredErrors(data.map(guessFn), data.map(f))

  def sumOfSquaredErrors(y1s: Seq[Double], y2s: Seq[Double]): Double =
    y1s.zip(y2s).map{ case(x, y) =>
      val diff = x - y
      pow(diff, 2)
    }.sum


OK, nothing clever here. It's just a simple approximation technique that gives reasonable answers:

real:     intercept = 0.9830623503713442, rate = 0.8187143023851281
guessed:  intercept = 0.9598594405202521, rate = 0.8497631295034207

But it has some obvious flaws. This only handles functions with one independent variable (ie, it only deals with (Double) => Double. What about (Double, Double) => Double or (Double, Double, Double) => Double etc?)

So, how do the big boys do it in maths libraries? One cunning way can be found here in the Apache commons math library. It takes a bit of head-scratching to understand why it works but this is what I found.

First, lets expand on the comments in the JavaDoc. Say, we have a set of real data and a proposed linear function that can approximate that data. Let's say we have n samples of real data and our function has k independent variables.

Aside: a note on terms used here: 
"A simple way to describe variables is as either dependent, if they represent some outcome of a study, or independent, if they are presumed to influence the value of the dependent variable(s)... The dependent variable is also referred to as the 'Y variable' and the independent variables as the 'X variables'. Other names for the dependent variable include the outcome variable and the explained variable. Other names for the independent variables include regressors, predictor variables and explanatory variables." [1]
Right, so we have n equations that look something like this:

k
y = Σ xi bi + ε
i = 1

where y is the actual output for trial n, x is an input, b is the coefficient we're trying to find and ε is the error between our prediction and the actual value.

This can be expressed in matrix form as:

Y = (X' B') + E

where Y is a vector of all the n outputs, X' is the matrix of all the n inputs, B' is all k coefficients and E is a vector of errors.

By adding an extra row to B' and an extra column to X', we can subsume the E vector into these terms to make the equation look neater and easier to handle, thus:

Y = X B

Now comes the clever bit. A matrix can be represented as the product of two matrices in a process called QR decomposition where Q and R are the two matrices. The interesting thing about this is that R is an upper triangular matrix. Why this is important will become clear later.

QR Factorization

This trick simply states that "a matrix whose columns are v1, v2, ... vn can  be expressed as the product of two matrices: a matrix [Q] whose columns v*1, v*2, ... v*n  are mutually orthogonal and a triangular matrix [R]." [2]

The algorithm goes like this. Take the vectors v1, v2, ... vn one at a time. For each vector, find its components that are orthogonal to those that have come before. This is done by subtracting from it its projection on the space represented by the vectors that have already been processed and adding the result to the set. Full proofs can be found in [2] but the gist is: imagine shaving off each vector its projection on the space defined by the vectors before it.

This is a recursive algorithm that would give you:

v1, v2, ... vn  = σ1 v*1, σ2 ( (v*1) -  (v2 . v*1)), ... , σn (v*-  (v2 . v*1) - ... (vn . (v*-  (v2 . v*1) - ... (vn-1 . (v*1 - ...))

or, to express it more neatly in matrix form:

| v*11  v*12  v*13  ... | | σ11  σ12  σ13  ... |
| v*21  v*22  v*23  ... | | σ21  σ22  σ23  ... |
| v*31  v*32  v*33  ... | | σ31  σ32  σ33  ... |
...

where v*11, v*21, v*31, ... are the components of v*1 and σ11, σ21, σ31, ... are the components of σ1.

Now, since σ1 is only ever multiplied by v*1 (see the expansion of the recursive algorithm above) then all the elements σi1 must be 0 for i>1.

Similarly, since σ2 is only ever multiplied by v*1 and v*2 (again, see the above expansion) then σi1 must be 0 for i>2 and so on.

So the matrix for σ must be an upper triangular matrix and the matrix product above can better be written as:

| v*11  v*12  v*13  ... | | σ11  σ12  σ13  ... |
| v*21  v*22  v*23  ... | | σ22  σ23  ... |
| v*31  v*32  v*33  ... | | σ33  ... |
...

Note that the v*vectors can be chosen to have unit length of 1. This will come in handy later.

Back to linear regression

Knowing this, we can now take our equation

Y = X B

and note that X can be expressed as Q R

Y = Q R B

multiply both sides by Q-1 (at the beginning of each side as matrix multiplication is not commutative):

Q-1 Y = Q-1 Q R B

but note that Q-1 Q is multiplying every orthogonal column vector within it by all the column vectors. This means that the value will be 1 (they're unit vectors) when a column is multiplied by itself and 0 otherwise (from the definition of orthogonal vectors). So, the equation becomes:

Q-1 Y = R B = A

because Q-1 Q is the identity matrix (A is just short-hand for Q-1 Y).

Now, we can solve through back-substitution. Since R is an upper rectangular matrix, this equation expands to:

rn,n bn = an,n

rn-1,n-1 bn-1 + rn-1,n bn = an-1, n-1 + an-1, n 
.
.
.

Well, we know all the values of the A and R matrices (since they are derived from the outputs and the v*vectors) we can solve the first equation for bn. We can substitute this into the second equation and solve it for bn-1 and so on (this is back-substitution).

[1] Statistics in a Nutshell (O'Reilly)

[2] Coding the Matrix - Philip Klein