Friday, February 16, 2018

Spark and Neural Nets


Spark support neural nets but the variety is quite limited. For instance, somebody asked on the Spark mailing list whether Recurrent Neural Networks would soon be implemented. To which the answer was "The short answer is there is none and highly unlikely to be inside of Spark MLlib any time in the near future."

Deep learning in YARN

Since we're looking at getting serious with neural nets in Spark, we were forced to look at alternatives. A Yahoo! offering is the one we will soon try. It promises to run TensorFlow within Yarn. This also suits our hardware as it looks as if we may be able to exploit Remote Direct Memory Access - sharing memory between machines without it going via the (slow) OS. "Infiniband provides faster connectivity and supports direct access to other servers’ memories over RDMA. Current TensorFlow releases, however, only support distributed learning using gRPC over Ethernet. To speed up distributed learning, we have enhanced the TensorFlow C++ layer to enable RDMA over Infiniband" the Yahoo! team claim.

Another alternative is just to deploy a library that is dedicated to exotic types of neural nets and DeepLearning4J looks ideal for that. "Data parallelism shards large datasets and hands those pieces to separate neural networks, say, each on its own core. Deeplearning4j relies on Spark for this, training models in parallel and iteratively averages the parameters they produce in a central model. (Model parallelism ... allows models to specialize on separate patches of a large dataset without averaging.)"



(From "Large Scale Distributed Deep Networks", Dean et al)

Now, regarding GPUs note what the author of netlib-java, the binding that exploits BLAS, says: "Be aware that GPU implementations have severe performance degradation for small arrays."

Pure Spark implementation

So, while we're preparing to install all these libraries, I had time to play with Spark's implementation of neural networks on the public domain newsgroups data I spoke about before. There were some interesting findings.

The first was that a lot of the work is done on the driver. This seems to happen a lot in some of the older Spark code (something I've complained about before). Although there is parallelism in o.a.s.m.o.CostFun where the gradient of partitioned features in an RDD are calculated by adding them altogether, other calculations appears to be done locally in the driver. For instance, the updating of this potentially very large result from the executors is done in o.a.s.m.ann.ANNUpdater and the BFGS calculation appears to take place entirely in Breeze (see breeze.optimize.FirstOrderMinimizer.infiniteIterations).

Consequently, I was forced to give the driver 30gb of memory and set spark.driver.maxResultSize to 8192. Only then could I run against a MultilayerPerceptronClassifier with layer sizes [262144, 100, 80, 20]. Note, these sizes come from the size of my word vector calculated by TF-IDF being 262144 and there being only 20 different newsgroups. The 100 and 80 were guessed by slowly increasing them until I hit memory problems.

Results

Now, with this neural net architecture I was able to achieve about 81% accuracy in about 30 minutes (an architecture with [262144, 50, 40, 20] was much faster and didn't require giving the driver more memory but had an accuracy of about 77%).

But this does not compare favourably with avoiding neural nets altogether. A naive Bayes classifier took no time at all, didn't require much memory and gave me about 84% accuracy. Simplest thing that works, eh?

Spark OOMEs during Matrix Multiplication


It seems that I am not the only person to suffer from the problem of OutOfMemoryErrors while performing matrix multiplication (and other operations) in Spark. This is because "Spark keeps the partition that it is working on in memory (and does not spill to disk even if it is running OOM)" (from the mailing lists).

For instance, computePrincipalComponentsAndExplainedVariance can't handle more than 65535 columns and does all the work on the driver ("local matrix" in the docs) by delegating to Breeze which in turn appears to use a native library (lapack).

There are various proposed solutions. Playing with spark.sql.shuffle.partitions which "configures the number of partitions to use when shuffling data for joins or aggregations" (docs). Tuning this appears interesting. "Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000" (from StackOverflow).

But this seems far too fiddly. It would be better if I never had to worry about such things. To this end, I've created my own simple library to perform "truly scalable matrix multiplication on Spark".

Note that this is the general case of multiplying two different matrices. In the special case that you're multiplying a matrix by its own transpose, you might prefer the approach taken in a previous post on this blog.

Tuesday, February 13, 2018

Neural Network Notes

Some miscellaneous neural network terms. I hope to expand on all of them later.

Deep Learning

"Taking complex raw data and creating higher-order features automatically in order to make a simpler classification (or regression) output is a hallmark of deep learning... The best way to take advantage of this power is to match the input data to the appropriate deep network architecture." [1]

Architecture of ANNs

"We can use an arbitrary number of neurons to define a layer and there are no rules about how big or small this number can be. However, how complex of a problem we can model is directly correlated to how many neurons are in the hidden layers of our networks. This might push you to begin with a large number of neurons from the start but these neurons come with a cost... There are also cases in which a larger model will sometimes converge easier because it will simply 'memorize' the training data." [1]

"Hidden layers are concerned with extracting progressively higher-order features from the raw data."

"A more continuous distribution of input data is generally best modelled with a ReLU activation function. Optionally, we'd suggest using the tanh activation function (if the network isn't very deep) in the event that the ReLU did not achieve good results (with the caveat that there could be other hyperparameter-related issues with the network." [1]

"Well if your data is linearly separable (which you often know by the time you begin coding a NN) then you don't need any hidden layers at all…One hidden layer is sufficient for the large majority of problems." (StackOverflow)

In the output layer for binary classification "we'd use a sigmoid output layer with a single neuron to give us a real value in the range of 0.0 to 1.0". [1]

"the best way to build a neural network model: Cause it to overfit and then regularize it to death... Regularization works by adding an extra term to the normal gradient computed."

The right model

"If we have a mutliclass modeling problem yet we only care about the best score across these classes, we'd use a softmax output layer with an arg-max() function to get the highest score of all the classes. The softmax output layer gives us a probability distribution over all the classes" [1]

"If we want to get multiple classifications per output (eg person + car), we do not want softmax as an output layer. Instead, we'd use the sigmoid output layer with n number of neurons, giving us a probability distribution (0.0 to 1.0) for every class independently". [1]

"In certain architectures of deep networks, reconstruction loss functions help the network extract features more effectively when paired with the appropriate activation function. An example of this would be using the multiclass cross-entropy as a loss function in a layer with a softmax activation function for classification output." [1] However, note that cross-entropy is not symmetric so might wrongly favour certain values over others even if they're equally wrong [see this StackOverflow answer].

Optimizations

"We define a hyperparameter as any configuration setting that is free to be chosen by the user that might affect [sic] performance." [1] For example layer size, activation functions, loss functions, epochs etc.

"And so on, until we've exhausted the training inputs, which is said to complete an epoch of training."
http://neuralnetworksanddeeplearning.com/chap1.html

"First-order optimization algorithms calculate the Jacobian matrix... Second-order algorithms calculate the derivative of the Jacobian (ie, the derivative of a matrix of derivatives) by approximating the Hessian." [1]

"A major difference in first- and second-order methods is that second-order methods converge in fewer steps yet take more computation per step". [1]

"The 'vanilla' version of SGD uses gradient directly, and this can be problematic because gradient can be nearly zero fir any parameter. This causes SGD to take tiny steps in some cases, and steps that are too big for situations in which the gradient is too large. To alleviate these issues, we can use the technique such as:

"AdaGrad is monotonically decreasing and never increases the learning rate" [1]

Autoencoders

"An autoencoder is trained to reproduce its own input data."[1]

"The key difference to note between a multilayer perceptron network diagram (from earlier chapters) and an autoencoder diagram is the output layer in an autoencoder has the same number of units as the input layer does." [1]

"Autoencoders are good at powering anomaly detection systems."

Mini-batches

If the cost function is C, then "stochastic gradient descent can be used to speed up learning. The idea is to estimate the gradient ∇C by computing ∇Cx for a small sample of randomly chosen training inputs. By averaging over this small sample it turns out that we can quickly get a good estimate of the true gradient ∇C, and this helps speed up gradient descent, and thus learning.

This "works by randomly picking out a small number mm of randomly chosen training inputs. We'll label those random training inputs X1,X2,…,Xm and refer to them as a mini-batch. Provided the sample size m is large enough we expect that the average value of the ∇CXj will be roughly equal to the average over all  ∇Cx" (from neuralnetworksanddeeplearning.com).

Boltzmann machines

Defined as "a network of symmetrically connected, neuron-like units that make stochastic decisions about whether to be on or off". [1]

"The main difference between RBMs and the more general class of autoencoders is in how they calculate the gradients." [1]

Regularization

"Dropout and DropConnect mute parts of the input to each layer such that the neural network learns other portions".

ReLU

ReLU - rectified linear units ReLU "are the current state of the art because they have proven to work in many situations. Because the gradient of a ReLU are either zero or constant, it is possible to reign in the vanishing exploding gradient issue. ReLU activation functions have shown [sic] to train better in practice than sigmoid functions".  [1]

Leaky ReLUs

"Unfortunately, ReLU units can be fragile during training and can “die”. For example, a large gradient flowing through a ReLU neuron could cause the weights to update in such a way that the neuron will never activate on any datapoint again. If this happens, then the gradient flowing through the unit will forever be zero from that point on. That is, the ReLU units can irreversibly die during training since they can get knocked off the data manifold. For example, you may find that as much as 40% of your network can be “dead” (i.e. neurons that never activate across the entire training dataset) if the learning rate is set too high. With a proper setting of the learning rate this is less frequently an issue.

Leaky ReLUs are one attempt to fix the “dying ReLU” problem. Instead of the function being zero when x < 0, a leaky ReLU will instead have a small negative slope (of 0.01, or so)" (from here).

[1] Deep Learning - a practitioners guide.

Monday, February 12, 2018

Great expectations


Since this took me a few hours to sort out in my head, I thought I would blog it.

The Bessel Correction is used taking a sample with mean x̄ and variance s2 from  a population whose true mean is μ and true variance is σ2.

The standard deviation, s, is:

√(Σin(xi-x̄)2/n)

but we'd be wrong if we thought we could use this as an estimate for σ2. (We actually divide by n-1).

Note that sometimes we don't need to estimate the population. In the case of a country conducting a census, we have all the information. However, for most use cases we need to sample the country's population.

[Aside: Interestingly, “such a basic concept as standard deviation, with an apparently impeccable mathematical pedigree, is socially constructed and a product of history” Leeds University].

The reasoning goes like this.

The expected value for  is E[] = μ because the expectation is the mean, by definition.

The variance of  is:

E[( - μ)2] - (E[ - μ])2

also by definition.

The second term is clearly zero when we expand it and substitute in the expected value for  we've just stated above.

We then expand the first term and take the expected value of all the resulting terms. However, we have to be careful here as E[A.B] = E[A].E[B] iff A and B are independent.

To illustrate, the expected value of rolling a pair of fair 6-sided dice is (3.5)2 which is 12.5. But the expected value of rolling just one die and squaring the result is (12 + 22 + ... 62)/6 which is about 15. Clearly they are not the same probability distribution as, for example, p(12) is non-zero in the first case but zero in the second (as you can't get a 12 by squaring integers).

So, let's represent the variance of  as:

E[ (Σin(xi-μ)/n) . (Σjn(xj-μ)/n) ]

for all terms where i≠j, the distributions are independent so this becomes:

E[(x-μ)2] = E[x2 - 2xμ + μ2] = E[x2] - 2E[x]μ + E[μ2] = E[x]E[x] - 2μ2μ= μ- 2μ2μ2 = 0

but when i=j and the distributions are not independent, we have

E[ Σin(xi-μ)2/n2 ] = E[ E[(xi-μ)2]/n ] = E[(xi-μ)2]/n = σ2/n

Now, we can re-express our definition of

s= Σin(xi-x̄)2/n

by noting that:

(xi-x̄)= ((xi-μ)-(x̄-μ))2 
       = (xi)- 2(xi)() + ()2

The expectation of the first term is σ2+E[x-μ]2=σ2 from our definition of the variance of the whole population, the last term is σ2/n as we've just shown and as for the middle term, we do the same trick as before. Where i=j, it's zero but for the remaining 1/n cases, it's σ2. So, this middle term must equal 2σ2/n.

Therefore:

E[s2]=(1-1/n)σ2

so we can estimate σ2 as

σ2≅ =(n/n-1)s2

So, don't forget your n-1 if you're taking a sample and not calculating values for the whole population.

Saturday, February 10, 2018

Textual Clustering


When it comes to classifying documents, there are many ways to skin a cat. The use case at hand is to classify small amounts of text into 21 pre-defined categories. Since this data is private, for this post I'll use the 20 Newsgroups data. This contains some 18k documents that have already been categorised and each document contains a subject and a body.

First we need to process the data. Spark has some great out-of-the box transformers that can tokenize (Tokenizer) the words, remove stop words (StopWordRemover), create n-grams (NGram), generate the TF-IDF vectors (HashingTF and IDF) and finally normalize them (Normalizer).

Now, a natural choice for then categorising these vectors might be KMeans since we know exactly how many clusters we are looking for. However, I had appalling results - over 99% of the vectors all congregated in one cluster while the other 1% was spread over the others.

This had me scuttling back to the code that created my features and checking everything was OK (punctuation removed, all lower case, etc). This Stackoverflow answer had a nice list of what to check. Unfortunately, it didn't help.

Thinking about it, this was not too surprising. KMeans measures the distance between points. That is calculated as:

Σk(vik - vjk)p

where k is the k-th element of the vectors that represent document i and j. The value p is set to 2 if you want Euclidean distances but that might not always be desirable.

Consequently, “k-means, however, may be a bad fit. Because the means computed will not have a realistic sparsity, but will be much more dense” (from Stackoverflow).

Anyway, for most documents (certainly when we're dealing with just the subject lines), it is highly likely that one document may not contain words in the other even if they belong to the same category. As a result, the distance between two documents that belong to the same category but not sharing any common words will likely be the same as two documents belonging to different categories. At this point, KMeans is confused by sparse vectors.

So, an alternative was to use Spark's NaiveBayes implementation. The results here were immediately decent - about 85% success in classifying whole documents and 71% success for just classifying them based on their subject text.

The best explanation of this algorithm I found was here on Stanford University's website. The probability of document d being in category c is:

p(d|c) ∝ p(c) ∏k p(tk|c)

where p(tk|c) is the probability of the k-th term of the bag of words of document d being in category c. This can be calculated from a histogram of words over categories.

The term p(c) is our Bayesian prior and is easily calculated from the distribution of documents over the categories.

We then choose the most likely - that is, highest value of p(d|c) - for all categories.

A non-sparse solution might be to use Spark's Word2Vec implementation. However, since the vector elements can be negative, you can't use NaiveBayes.


Sunday, February 4, 2018

p-values


The dangers of relying on p-values are well known. "In most science journals, researchers report p-values without apology, and readers interpret them as evidence that the apparent effect is real. The lower the p-value, the higher their confidence in this conclusion." (ThinkStats).

A good example of the problem can be found here. But, in brief, imagine this:

  • we test how effective are 1000 different drugs
  • of which only 100 are truly efficacious and therefore the other 900 are not
  • the p-value is a fairly typical 0.05 (5%) 
  • of the 900 useless drugs, 45 (=900*0.05) will appear efficacious when they are not.
  • of the 100 efficacious drugs, 5 (=100*0.05) will appear useless when they are not.

We'll ignore 5 effective cures (false negatives) and think 45 are useful when they are not (false positives). Since only 100 are truly effective, these are large percentages in our errors.

From Machine Learning in Action (Manning)
Precision = TP/(TP+FP) . Precision tells us the fraction of records that were positive from the group that the classifier predicted to be positive.
Recall = TP/(TP+FN) . Recall measures the fraction of positive examples the classifier got right.

Given that our true positives cannot be better than 95 (as 5 were wrongly deemed ineffective drugs), our precision and recall would not be better than 0.68 and 0.95 in this case.

Spark query plans


We were trying to join two rather large Dataframes in Spark but kept getting horrible memory problems. Using my trusty jstat, I noticed that the driver was using all its heap.

Trying to find what the problem was, I executed:

val counted = csvAccount.groupBy('ID_ACCOUNT).agg(count("*").as("co")).select(max('co))
 
but with little success.

Looking at the query plan, I saw a broadcast join. What is this? “With a broadcast join one side of the join equation is being materialized and send to all mappers" (from here). Note that all the data "needs to fit into the memory of the Driver!” (ibid).

Apparently, you can disable this functionality with:

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

“Broadcast joins don't work well for cartesian products because the workers get so much broadcast data they get stuck in an infinite garbage collection loop and never finish. Remember to turn this back on when the query finishes” (from here).

This certainly sounds like what I was seeing.

As it happens, the problem was dirty data - specifically the keys on which we were joining were being truncated (due to bad CSV parsing) and so there were far more Rows for a given 'key' than there should be. If there were M dupes in one Dataframe and N in the other, there were MxN Rows for a given 'key'. It's easy to see how this Cartesian product then explodes and consumes far too much memory.