Saturday, March 30, 2019

FP and Big Data part 2


Functional Programming gives guarantees that Big Data tools can leverage. For example, in the event of an aggregation, "such as SUM, expressible as an associative and commutative operator (who said “monoid”?), it can be executed more efficiently, e.g. the framework can push some aggregation into the first map. In Hadoop, such an aggregation is called a combiner." (Eugene Kirpichov at Medium.com)

Note that Spark's [SO] reduce (and indeed Scala's too) is over commutative semigroups when it should be on just semigroups. "The class of commutative semigroups consists of all those semigroups in which the binary operation satisfies the commutativity property that ab = ba for all elements a and b in the semigroup." (Wikipedia).

[In Scala, "reduceLeft and foldLeft apply the operator from left to right (head to tail). reduceRight and foldRight apply it from right to left. As for reduce and fold, the order is unspecified, which is what allows to have parallel implementations for it (see ParIterableLike). As a consequence, you'd better be sure that your operator is associative [and commutative - think String concatenation] when calling reduce, otherwise the result will not be deterministic." (StackOverflow)]

This clearly is not true for some obvious semigroups (for instance, string concatenation with the empty string being the "zero" element). But it's not necessarily true for Java primitives either.

Haskell doesn't have an issue with floating point folding:

$ ghci
GHCi, version 7.10.3: http://www.haskell.org/ghc/  :? for help
Prelude> foldr (\x y -> x + y) 0 [1.0, 0.05, 0.05]
1.1
Prelude> foldl (\x y -> x + y) 0 [1.0, 0.05, 0.05]
1.1

Whereas, Scala (due to the IEEE754 standard) gives this:

    val floats = Seq(1.0f, 0.05f, 0.05f)
    println(floats.sum)                 // 1.0999999
    println(floats.reduce(_ + _))       // 1.0999999
    println(floats.reduceLeft(_ + _))   // 1.0999999
    println(floats.reduceRight(_ + _))  // 1.1

And it's not just floating point arithmetic that has this problem. If you exceed the maximum value for the Integer and Long, you also will suffer misery.

So be careful. Certain FP rules are not enforced in the JVM.

Friday, March 29, 2019

Hands on tuning a neural net


Introduction

Once again, I am trying to find what a typical domain name is using machine learning. For this, I use one million domains that Cisco helpfully have made available here. Once I have this, I can then use known malicious domains (provided by Bambenek Consulting here) to see how they deviate.

The data

I encode my domains first by creating a histogram of character pairs and turning this into probabilities. Now, each data point is a sparse vector of length 1444 (38 x 38 legitimate characters used in domain names).

Now, I fire up a Variational Autoencoder written using the DL4J framework.

Rig


I have an old GeForce GTX 650 GPU that has only 1gb of memory. "To be blunt, 1GB is really tiny by today's standards, it's not really enough for deep learning especially if you're running a display on it at the same time (OS will use some of the memory)" Alex Black, DL4J Gitter room (Jan 29 00:46 2019)

So, I ran the neural net on the CPU.

I run my code with:

export D=/media/sdb8/Data/CiscoUmbrella/MyData/Real/Results/`date +%d%m%y%H%M` ; mkdir -p $D ; export MAVEN_OPTS="-Xmx32g" ;  taskset 0xFFFCE mvn clean install -DskipTests exec:java -Dexec.mainClass=XXX  -Dexec.args="YYY" | tee $D/mvn.log ; shutter --window=.*firefox.* -o $D/overview.png -e

Note taskset which uses all cores but one (15 out of my 16) so the OS is not overwhelmed.

Also, note shutter which takes a picture of the DL4J console that I am running in Firefox.

Learning Rate

On the "Update: Parameter Ratio Chart" tab, the mean magnitude means "the average of the absolute value of the parameters or updates at the current time step". From the DeepLearning4J visualisation guide:
The most important use of this ratio is in selecting a learning rate. As a rule of thumb: this ratio should be around 1:1000 = 0.001. On the log10 chart, this corresponds to a value of -3. Note that is a rough guide only, and may not be appropriate for all networks. It’s often a good starting point, however. 
If the ratio diverges significantly from this (for example, > -2 (i.e., 10-2=0.01) or < -4 (i.e., 10-4=0.0001), your parameters may be too unstable to learn useful features, or may change too slowly to learn useful features. To change this ratio, adjust your learning rate (or sometimes, parameter initialization). 
With a learning rate of 1e-2 using activation function TANH, this chart was very jagged for me. Changing the learning rate (with Adam) to 1e-4 smoothed it with:
Learning rate 1e-4, activation functions TANH
The learning rate seems to be quite sensitive. With a value of 1e-3, performance is already degrading. Note the Parameter Ratios are touching -2 and above and becoming volatile:
TANH; learning rate 1e-3
The same is true of 1e-5 where this time the parameters are drifting below -4:

TANH; learning rate 1e-5
So, it seems at least for TANH that a learning rate of 1e-4 is the sweet spot.

But with LEAKYRELU, Parameter Ratios started touching -4 and then we started seeing NaNs again. Similarly, for SOFTPLUS, even a learning rate of 1e-5 wasn't enough to save us from NaNs:
SOFTPLUS; learning rate = 1e-5
This came as something of a surprise as LEAKYRELU is often said to be the default activation function.

What's more, my score sucked:

VAE's score with vectors of character bi-gram probabilities
We'll return to the choice of activation function later. In the meantime...

The Data (again)

Maybe representing my data as vector probabilities was not a great idea. So, let's try 1-hot encoding the characters:

VAE's score with 1-hot encoded characters
Ah, that's better (if far from optimal). It's also slower to train as the vectors are just as sparse but now have a size of about 4000.

Reconstruction Distribution

But wait - if the data has changed in its nature, maybe we need to re-asses the neural net. Well, the reconstruction distribution was a Gaussian which might have made sense when we were dealing with probability vectors. However, we're now 1-hot encoding so let's change it to a Bernoulli since "The Bernoulli distribution is binary, so it assumes that observations may only have two possible outcomes." (StackOverflow). Well, that sounds reasonable so let' see:

VAE score with 1-hot encoding, now using Bernoulli reconstruction distribution.
Negative scores? Time to ask the DL4J Gitter group:
PhillHenry @PhillHenry Mar 22 16:25Silly question but I assume a negative "Model Score vs. Iteration" in the "DL4J Training UI" is a bad sign...?
Alex Black @AlexDBlack 00:32@PhillHenry as a general rule, yes, negative score is badit is typically caused by a mismatch between the data, loss function and output layer activation functionfor example, something like using tanh activation function for classification instead of softmax
PhillHenry @PhillHenry 03:02Are there any circumstances where it's OK? (BTW I'm trying to run a VAE).
Samuel Audet @saudet 03:34IIRC, this happens with VAE when it's not able to fit the distribution well
Alex Black @AlexDBlack 04:14VAE is an exception, to a degreeyou can still have a mismatch between the reconstruction distribution and the actual data, and that will give you rubbish negative results (like incorrectly trying to fit gaussian data with bernoulli reconstruction distribution)however, it is possible (but rare) for negative log likelihood to be negative on very good fit of the data - IIRC we exclude some normalization terms. if that's the case, you should see excellent reconstructions though
Well, I've already switched to Bernoulli so what's going wrong?

The Data (yet again)


Scratching my head over a coffee, it struck me. I was still standardising the data as if it were Gaussian! Not sure how a unit test could have saved me here but OK, let's remove the standardisation since why would you standardise a Bernoulli distribution?
VAE score with LEAKYRELU
Wow, that looks almost normal. 

The Activation Function

Since we've changed an awful lot, now might be a good time to try to ditch TANH and try LEAKYREUL again. Sure enough, we don't see any NaNs and the score looks good.

Let's see what the ROC curve looks like with my test data:
ROC curve for VAE with LEAKYRELU, 1-hot encoding, Bernoulli reconstruction
OK, not amazing but it's starting to look better than a monkey score. But how do we make things better?

Architecture


Let's start with the architecture of the net itself.

"Larger networks will always work better than smaller networks, but their higher model capacity must be appropriately addressed with stronger regularization (such as higher weight decay), or they might overfit... The takeaway is that you should not be using smaller networks because you are afraid of overfitting. Instead, you should use as big of a neural network as your computational budget allows, and use other regularization techniques to control overfitting." (Andrej Karpathy)

"How many hidden layers are required to see an improvement where a shallow net underperforms is anyone's guess, but in general more depth would be better - you get more abstract, more general solutions. In practice though, optimization isn't quite so neat, and adding capacity adds risks of the process falling into various pitfalls - local minima, overfitting... And of course then there's the added computational cost." [StackOverflow]

So, let's try an architecture with layers of size [3002, 1200, 480, 192, 76, 30]:

VAE with 5 hidden layers


Here (StackOverflow) is an example of where the latent space is just too big causing overfitting. "You could compress the output further... Autoencoders do exactly that, except they get to pick the features themselves...  So what you do in your model is that you're describing your input image using over sixty-five thousand features. And in a variational autoencoder, each feature is actually a sliding scale between two distinct versions of a feature, e.g. male/female for faces...  Can you think of just a hundred ways to describe the differences between two realistic pictures in a meaningful way? Possible, I suppose, but they'll get increasingly forced as you try to go on.  With so much room to spare, the optimizer can comfortably encode each distinct training image's features in a non-overlapping slice of the latent space rather than learning the features of the training data taken globally."

The takeaway point is not to make the latent space layer too big. We deliberately want to squeeze the data through a small gap. So, let's really squeeze this data with an architecture like [2964, 1185, 474, 189, 75, 2]

VAE with 5 hidden layers and the last layer has 2 units
Hmm, no change and the same happened with final units of 5, 10 and 20. So, this looks like a dead-end.

Also, those weights are going crazy...

Weights

"For NNs though, what you want out of your deep layers is non-linearity. That's where the magnitudes of weights start to matter.

"In most activation functions, very small weights get more or less ignored or are treated as 'evidence against activation' so to speak.

"On top of that some activations, like sigmoid and tanh, get saturated. A large enough weight will effectively fix the output of the neuron to either the maximum or the minimum value of the activation function, so the higher the relative weights, the less room is left for subtlety in deciding whether to pass on the activation or not based on the inputs." [StackOverflow]

Regularization

"A model with large weights is more complex than a model with smaller weights. It is a sign of a network that may be overly specialized to training data. In practice, we prefer to choose the simpler models to solve a problem (e.g. Occam’s razor). We prefer models with smaller weights... The L2 approach is perhaps the most used and is traditionally referred to as “weight decay” in the field of neural networks." (MachineLearningMastery)

However, massively increasing the L2 score made the score worse and no amount of tuning seemed to change that. I was hoping to clip excessive activations but adding RenormalizeL2PerParamType didn't seem to make any difference either.

Well, since adding more layers can be responsible for overfitting, let's now take them away. With merely two hidden layers of size [1003, 401] the weights look more respectable.
VAE with 2 hidden units
The Data (last time)

Finally, I went back to the domain names yet again and stripped them not just of the Top Level Domains but also any preceding text. So, for instance, if the domain name is javaagile.blogspot.com it is mapped to simply blogspot.

Gratifyingly, the the score went from lingering at about 100 down to 40 and the ROC curve now looks decent:


Things learned

- Unit test your data munging code.

- Keep your logs. Print out your hyper-parameters so you capture this in the logs.

- Shuffle your data! I wasted a day testing the neural net on a subset of data but this subset was sorted! Therefore, only the top 10000 data points were being used. You might find the Linux command, shuf, useful for doing this.

- Write tests that test your results, making sure they're sensible

Goldilocks

Getting the best out of the neural net seems to be a combination of tuning:
  • The data that best represents your problem. This is perhaps the most important.
  • The hyper-parameters.
  • The architecture.
As an aside, take a look at TensorFlow playground. It's a great way to get an intuitive feel for what is going on. It's just a neural net that runs in your browser.

Tuesday, March 19, 2019

Spotting Dodgy Domain Names


These are various approaches employing machine learning to differentiate between good domain names and bad ones. By bad, I mean domains that are used to trick people into thinking they're clicking on a legitimate address (www.goog1e.com, for instance).

Data

The data is the top 1 million domain names as recorded by Cisco. You can find it here.

The data was stripped of the top level domains to remove elements that were not useful.

Then, what is left was one-hot encoded converted to bigrams of characters leading to vectors of length 1444 (that is, 38 x 38 possible ASCII characters). The code for this lives here.

This data set was then split down a 95/5 ratio of training to holdout.

We created test data from this holdout data when we deliberately corrupted it. This was done by changing either a single 'o' to become an '0' or a single 'l' to become a '1'. If there were no such characters to corrupt, the data point was discarded.

Kullback-Leibler Results

We actually use a variant of KL divergence in these results that handles zeros in the data - the Jensen Shannon metric.

In the following histograms, red indicates bad domains and green good ones.

The tables represent the entire holdout ("Good") and the entire test ("Bad") data sets with their Jensen-Shannon metric calculated against the training data.

Note that these metrics are calculated by summing the columns of the data sets. This leads to something of an unrepresentative description of the data since the original is one-hot encoded. Therefore, in any subset of 38 elements of a real vector, only one can be 1 and all the rest must be 0. That is, the elements in a vector for a given domain are not independent of each other.

No normalisation

KL Score Histogram with no normalisation
Note that "+4.914e6" in the bottom right hand corner. Indeed the KL scores are close:

ClassKL Score
Good4 319 810.24
Bad4 169 380.40

There's a hair's breadth between them so this is probably going to be hard to differentiate the classes.

L1-Normalise everything

Here, the control group's KL was 6370.17 and the bad domains scored 6370.92 - very close. The histograms unsurprisingly look similar:

KL Score Histogram with all vectors L1-Normalised
Hmm, still not much to work with, so let's try combinations of the two. First:

Normalised Baselines, No Normalisation for Others

In this trial, the baseline is L1-normalised but the other vectors are not.

ClassKL Score
Good97 285.45
Bad139 889.62

The histogram for the holdout and bad domains now looks like:

KL Score Histogram with the baseline L1 normalised; all other vectors unnormalised
This is good. There are now two distinct distributions with separate peaks.

L2-normalisation gave very similar KL scores and a graph that looked like:

KL Score Histogram with the baseline L2-normalised; all other vectors unnormalised
Let's try:

Unnormalised Baseline, L1-Normalisation for Others

... and it looks like we're back to square one. The KL scores are amazingly close:

ClassKL Score
Good4 914 404.73
Bad4 914 407.38

so, not surprisingly are the distributions of the holdout and test data:

KL Score Histogram with the baseline unnormalized; all other vectors L1-normalized
Again, note that: +4.9144e6 in the bottom right hand corner.

So, we seem to be going backwards.

Aside: Normalise then sum baseline, all others unnormalised

I tried a few other variations like normalising then summing, first with L1:

ClassKL Score
Good773 262.74
KL Score Histogram with the baseline L2-normalised then summed; all other vectors unnormalised

Bad704 254.41

then L2:

ClassKL Score
Good94 506.53
KL Score Histogram with the baseline L1-normalised then summed; all other vectors unnormalised

Bad83 559.17

But their summed distributions didn't give me a difference in KL scores as good as the "Normalised Baselines, No Normalisation for Others" results, so I stuck with those and I discuss just that distribution in the next section.

The ROC

Running our model against the test data, the ROC looks likes somewhat underwhelming:

ROC for "Normalised Baselines, No Normalisation for Others" KL

Or, in 3d where we can see the threshold value:


where a threshold value of about 10 is the closest the curve comes to the top, left hand corner.

It seems clear that out tool cannot with great confidence determine if a domain name is suspect or not. But then could a human? Which of these domain names would you say are bogus and which are genuine?

mxa-00133b02.gslb.pphosted.com
m7.mwlzwwr.biz
live800plus.jp
lf.wangsu.cmcdn.cdn.10086.cn
x10.mhtjwmxf.com
mailex1.palomar.edu
3gppnetwork.org
mimicromaxfinallb-1513904418.ap-south-1.elb.amazonaws.com
mkt4137.com
modt1thf4yr7dff-yes29yy7h9.stream
jj40.com

This is of course a trick question. They're all genuine URLs that Cisco have logged.

However, if our tool is used as part of a suite of metrics it might identify nefarious activity.

Conclusion

Our tool is definitely better than the monkey score but can we improve it? I have a neural net that looks promising but is computationally very expensive. The KL calculations (and variants of them) are very fast and cheap. I'll compare them to a neural net solution in another post.

Tuesday, March 12, 2019

Everything you needed to know about Spark Structured Streaming


Back Pressure

Backpressure is defined at Wikipedia in the context of routing "as an algorithm for dynamically routing traffic over a multi-hop network by using congestion gradients."

Note that back pressure within Spark was once an option (see the Spark property spark.streaming.backpressure.enabled). However, it appears that back pressure is not necessary in Spark Structured Streaming from Kafka (StackOverflow):
"Structured Streaming cannot do real backpressure, because, such as, Spark cannot tell other applications to slow down the speed of pushing data into Kafka. 
"If you mean dynamically changing the size of each internal batch in Structured Streaming, then NO. ...Generally, Structured Streaming will try to process data as fast as possible by default. There are options in each source to allow to control the processing rate, such as maxFilesPerTrigger in File source, and maxOffsetsPerTrigger in Kafka source."
In general, Kafka consumers don't need back pressure. Note what the creators of the .NET Kafka client write: "The Kafka consumer will only pull from Kafka as fast as you are handling the messages. If you are forwarding the messages consumed from Kafka onto another queue, simply pause before adding more to that queue if it is full... If you question was just to not poll too fast on consumer side (to avoid taking too much memory), then yes, pause will be ok when available. You can also simply not call Poll when your buffer is full"


Partitions and Parallelism

What does the notion of a DataFrame's mean in the world of streams? "When you retrieve the data at first, the number of partitions will be equal to the number of Kafka partition... If your Kafka topic has only 1 partition, that means that prior to groupByKey, your internal stream will contain a single partition, which won't be parallalized but executed on a single executor. As long as your Kafka partition count is greater than 1, your processing will be parallel. After the shuffle boundary, Spark will re-partition the data to contain the amount of partitions specified by the spark.sql.shuffle.partitions." (StackOverflow)


Many, Small Files

Note that the nature of streaming means many files may be created (at least one per mapper per trigger duration - "interval between checkpoints"). Too many files can swamp the Hadoop Name Node so you may want to curb this. Evo Eftimov (in his blog) talks of ways you can do this. Amongst his ideas, you can increase the trigger time or coalesce them in a batch job. We do the latter but have not got to the tuning stage yet. The option, maxFilesPerTrigger, appeared to make no difference to us.

"Note that when writing DataFrame to Parquet even in “Append Mode”, Spark Streaming does NOT append to already existing parquet files – it simply adds new small parquet files to the same output directory... The columns in parquet are stored sequentially one after another and the next such table data segment has to be ready before beginning to write to a parquet file." [ibid]

So, although you might have set the org.apache.spark.sql.streaming.OutputMode to be Append, the files are not actually appended. A new file is created each trigger time.  If the OutputMode for Parquet is Complete, you'll get "Data source parquet does not support Complete output mode" because the structure of a Parquet file is such that its columns are uninterrupted and lie back-to-back.

"In terms of purging old parquet files you may want to partition the data and then periodically delete old partitions as needed. Otherwise you can't just delete rows if all the data is being written to the same output path." (StackOverflow). Consequently, we partition the incoming streams on time and then delete old directories after they have been collated by a Spark batch job.


Unions and watermarks

Streams can be unioned as they are persisted to the same sink. However, at first I was getting the error "There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue." This appeared to be just remarking that what it thought was a single source is actually now 2. Adding withWatermark appears to have fixed it

"A streaming query can have multiple input streams that are unioned or joined together... the global watermark will safely move at the pace of the slowest stream and the query output will be delayed accordingly" (the Spark docs).


Caveat

To summarise our architecture:

  1. Reads from a union multiple streams and writes partitioned on a timestamp.
  2. Coalesces each partition one-by-one sorting the data on field X as it goes. 

It is currently working fine in QA but has yet to meet production levels of data.