Monday, December 24, 2018

Unbalanced


... or imbalanced (apparently, imbalanced is the noun and unbalanced is the verb. Live and learn).

Anyway, in a data scientist job interview I was asked how I would build a network intrusion system using machine learning. I replied that I could build them a system that was 99% accurate. Gasps from my interviewers. I said, I could even give 99.9% accuracy. No! I could give 99.99% accuracy with ease.

I then admitted I could do this by saying that all traffic was fine (since much less than 0.01% was malicious), bill them and then head home. The issue here is imbalanced data.

This is a great summary of what to do in this situation. To recap:

1. If you have plenty of data, ignore a lot from the major class. If you have too little, duplicate the minor class.

2. Generate new data by "wiggling" the original (eg, the SMOTE algorithm).

3. "Decision trees often perform well on imbalanced datasets" [ibid].

4. Penalize mistakes in misclassifying the minor data.

For this last suggestion, you can use a meta algorithm and any model but in ANNs, you get it out of the box. In DL4J, I added something like this:

.lossFunction(new LossNegativeLogLikelihood(Nd4j.create(Array(0.01f, 1f)))

"Ideally, this drives the network away from the poor local optima of always predicting the most commonly ccuring class(es)." [1]

Did it improve my model? Well, first we have to abandon accuracy as our only metric. We introduce two new chaps, precision and recall.

Precision is the ratio of true positives to all the samples my model declared were positive irrespective of being true or false ("what fraction of alerts were correct?"). Mnemonic: PreCision is the sum of the Positive Column. Precision = TP / (TP + FP).

Recall is the ratio of true positives to all the positives in the data ("what fraction of intrusions did we correctly detect?"). Mnemonic: REcall deals with all the REal positives. Recall = TP / (TP + FN).

(There is also the F1 score which "is the 2*precision*recall/(precision+recall). It is also called the F Score or the F Measure. Put another way, the F1 score conveys the balance between the precision and the recall" - Dr Jason Brownlee. This equation comes from the harmonic mean of precision and recall).

What is a desirable results is very use case dependent. But with weights 0.005f, 1f, I got:

Accuracy:  0.9760666666666666
Precision: 0.3192534381139489
Recall:    0.9285714285714286
.
.

=========================Confusion Matrix=========================
     0     1
-------------
 28957   693 | 0 = 0
    25   325 | 1 = 1

Confusion matrix format: Actual (rowClass) predicted as (columnClass) N times
==================================================================

using faux data (6000 observations of a time series 50 data points long and a class imbalance of 100:1)

I got the job to build an intrusion detection system and these results would make the customer very happy - although I can't stress enough that this is a neural net trained on totally fake data. But if we could have 93% of intrusions detected with in only 1 in three investigations yielding a result, the client would be overjoyed. Currently, the false positives are causing them to turn the squelch in their current system too high and consequently they're not spotting real intrusions.

[1] Deep Learning: A Practitioner's Approach


Saturday, December 22, 2018

Faux data and DeepLearning4J


Like most people in corporate data science, I'm having a hard time getting my hands on real but commercially sensitive data within the oganisation. To this end, I'm writing a library to generate generic data. Sure, ersatz data is rarely decent but I figured if my models can't find the signal deliberately planted in the data, they're unlikely to find a signal in the real data.

I'm using the DeepLearning4J framework to see if the data is sensible. DL4J looks like it could be a real game-changer in the near future as it integrates easily with Spark (I ran their example with no major problems but noted it took quite a long time) and can import Keras models.

The first thing I did was try to build an Artificial Neural Net to detect unusual behaviour. One of the challenges I have in the day job is to distinguish between a network intrusion and genuine behaviour. Machines being accessed at night are suspicious but not necessarily compromised (some insomniac wants to do some work, for example). So, could a LSTM neural net (specially designed for time series) distinguish between night time and day time activity?

I stole the code for the ANN from here but came across this bug in the DL4J library, wasting a lot of time. Unfortunately, there has been no release with this bug fix so I am forced to use SNAPSHOTs. But even then, I was unable to detect the signal in the noise and kept seeing "Warning: 1 class was never predicted by the model and was excluded from average precision". Since I only have two classes (hacked/not hacked) this wasn't a great sign. Since the DL4J guys say "at this point you are just looking at a plain old tuning problem", I spent a lot of fruitless time twiddling knobs.

The problem was more profound than this. It appears I was asking an LSTM the wrong question. That is, although the data was a time series, each point in time was entirely unrelated to all the others. Changing the data to being either (1) spread randomly across a year and (2) tightly clustered around date-time gave the neural net a fighting chance of distinguishing the two. In the latter case, the data points bunch around a single date-time (imagine somebody fat-finguring their password a few times per year versus somebody trying to guess it in a small time window). In this case, each timestamp is highly dependent on the others.

Monday, December 17, 2018

Spark, Parquet, Stats


If you're using Spark to write data to HDFS, you might want to give a thought to how it is going to be read. How you write a Dataframe can make a huge difference to the efficiency of how we read.

This test creates a Dataframe with, let's say, columns X, Y and Z. We sort by X, partition by Y then save it.

This results in |Y| directories under our root (one for each Y-value) and at least as many parquet files as a Cartesian product of X and Y up to a maximum of 400. I'm not currently sure what this maximum comes from (it appears to come from spark.sql.shuffle.partitions * the number of partitions).

Only when we exceed this 400 number do we start finding multiple values of X in a single Parquet file.

Interestingly, although for our test, 6 constituent Parquet files are created (=|X| * |Y|= 3*2) when we save our Dataframe, the Dataframe that comes from reading the data back from HDFS has 4 partitions. That is, a partitions straddles more than one parquet file.

Predicate Pushdown

The reason we sort before we save is so each parquet file has a chance of containing only one (or at worst a small subset of) X. This makes searching much easier as Parquet might advertise the range for a column it has inside it. If your sought value is not in that range, the file is not searched.

To demonstrate this, we take a Dataframe that's sorted by "intKey" (X) and we partitionBy "partitionkey" (Y).

scala> val query = dataframe.where(s"$intKey == 3")
scala> query.explain()

== Physical Plan ==
*(1) Project [id#21, text#22, intkey#23, partitionkey#24]
+- *(1) Filter (isnotnull(intkey#23) && (intkey#23 = 3))
   +- *(1) FileScan parquet [id#21,text#22,intkey#23,partitionkey#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://127.0.0.1:41684/1544107487759], PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(intkey), EqualTo(intkey,3)], ReadSchema: struct

All good but note this caveat from CERN: "this feature has limitations, notably, it can only be used with certain data types and operators as implemented in Parquet and Spark. [Also] the results depend on the provided filter values and data distribution in the source table."

(If we were to search on partitionkey, then this would be an example of partition pruning).

Parquet tools


I did try to use the Hadoop Parquet tools but I could not find the min/max values of the Statistics object even if I set parquet.strings.signed-min-max.enabled as true in the config for my ParquetFileReader.

Not to worry, Spark can show us these statistics using dataframe.describe (StackOverflow). But, instead of loading the whole Dataframe, we instead just load an isolated parquet file via the usual means. When we do, we see something like:

+-------+-----------------+--------------------+------+
|summary|               id|                text|intkey|
+-------+-----------------+--------------------+------+
|  count|             1666|                1666|  1666|
|   mean|           5001.0|            Infinity|   0.0|
| stddev|2886.462540896729|                 NaN|   0.0|
|    min|                6|65656565656565656...|     0|
|    max|             9996|89898989898989898...|     0|
+-------+-----------------+--------------------+------+

(Note the stats for the text field).

Now we can see that for this particular Parquet file (that is just one piece of the whole Dataframe we started with) that the intkey field has a min/max of 0/0. So, Spark knows that if it is looking for intkey=3, not to bother even reading this file.

Buckets

Note that you can't call bucketBy when saveing  a data frame. Only on saveAsTable can you do this (see a previous post on this). You can then call analyze table on it.

Monday, December 10, 2018

Feature Engineering


It's no secret that most of data science is actually data munging. Indeed, since we've only been using a Naive Bayes model, there was only one hyperparameter: that for Laplacian smoothing. Tuning this did indeed make our model better but that was a fairly trivial exercise.  And it was the feature engineering that made the greatest difference.

Most online machine learning tutorials focus on the model not feature engineering (the only book on the subject that I am aware of is being written here). Perhaps because it is an art as much as a science. Well, these are some notes I've made over the last four months when trying to bring a bank transaction classification system to market. YMMV.

Data cleaning and domain knowledge

Some machine learning tool vendors say their products can do this for you auto-magically but practitioners I meet are skeptical. For instance, a colleague was working on a security system for a mobile banking app. If the 'phone had been compromised, some software by an Israeli company would flag the transaction as suspect. Yet, in all his training data not a single transaction was marked as such. Automated machine learning tools would have thought this field useless and discarded it yet it was incredibly important.

My client for this my most recent project was an Austrian bank. Now, there was some fairly trivial things we could do when feature engineering for German textual data. Replacing all German characters with Latin characters improved things a little. As did the fact that Germans often avoid umlauts when online replacing, for instance, "ö" with "oe" (the national rail company is öbb but their website is www.oebb.at).  So cleaning the data this way gave a slight improvement. But some things were not so straight forward.

The data

Our data was not much more than what you typically see on your bank statements. That is, some natural language and some other miscellaneous data. This other data included things you won't see on your statements like "POS K2 14/02/18 11:55" which is saying it's a Point of Service transaction (that is, you were in a store) at cashier desk K2 at a given date/time.

Our first instinct was to just treat the transactions as a corpus of text and for each word calculate its distribution over all characters. But why stick with unigrams?

Are Bigrams Better?


Maybe. They improved our bank transaction classification accuracy by about 4%. There are no a priori reasons for why bigrams are better for us. For the most part, we're not dealing with natural language and there is no reason why "BillaK2" makes more sense than "K2Billa" (order was important - see below).

It did make sense in some fields - for instance the recipient. "Phillip" and "Henry" are common enough names but "PhillipHenry" is sufficiently rare (in the German speaking world at least) for it to identify a single individual.

This unexpected boost in accuracy is not totally unheard of. David Marchette tells how he made a model that used a histogram over the bytes (that is, there were 256 buckets) in a computer virus binary when trying to identify which virus it was. It was surprisingly accurate (86% vs. a monkey-score of about 5%). "Computer scientists laugh when I tell this story" he says, not because binary executables work on word not byte sized units of instructions. So, in theory, it shouldn't work. But in practice, it does.

Field order

This made a remarkable difference. Since we were not using natural language, the order of the text fields was arbitrary. But choosing a wrong permutation could reduce accuracy by a massive 10%.

What an optimal permutation was could only be derived by brute force.

Filtering out noise

Calculating the Kullback-Leibler divergence between the distribution of words in the successfully and unsuccessfully classified data sets identified words that appeared to be over-represented. Ignoring them improved accuracy by about 2% for unigrams but made no difference to bigrams.

Similarly, looking at the entropy of the distribution of a word over all the categories and eliminating those with low entropy did not improve things.

Simply eliminating small words (less than 3 characters) and all strings that were just numerical boosted accuracy by a few percent for both bigrams and unigrams.

Another data masaging technique we used was to bucket transaction amounts rather than use raw figures. However, it raised the question: how best do we organise these buckets? One trick is to use a Random Forest to help you create features. Random Forests don't work well on high dimensional data (which ours would be if we had a field per word - we had a million strings) so we tried all but these features. Accuracy was unsurprisingly an appalling 35% but RFs are easy to read and I used its amount buckets in my model. However, it actually reduced accuracy somewhat. So, instead I stuck with my (roughly) logarithmic buckets.

Splitting the data caveat

Randomly selecting transactions to be used in the training and testing sets consistently resulted in about 95% accuracy. Pretty good. However, this hid a significant problem. If we randomly selected users for the training and testing sets and then partitioned their transactions, accuracy dropped to about 66%.

[Aside: I'm clearly not the first to make a mistake like that. See this data scientist make fall into the exact trap when using time series that this data scientist warns against].

This was because a lot of transactions were peculiar to the individual. For instance, if a tenant pays his landlord every month using the text "Mr Phillip Henry" and marks it as rent, only one transaction in the training set may be needed to train the model. However, if this user was not in the training data (and consequently none of his/her transactions) then the model would have a hard time concluding that a payment was rent purely on the text "Mr Phillip Henry".

This was something of a problem to the business as new users experience a low accuracy in transaction categorization. We never solved this problem and the business had to accept that the model had to learn a user's habits before it could be a great performer.

Future work

Alas, data wrangling never comes to an end. If I had more time, I would have investigated:

- Pointwise mutual information (see this SO answer for more pointers).
- Using n-grams of characters
- Bagging between different n-gram models
- Creating new features from eg, number of words in text, average word size etc
- text lemmatization and stemming (the difference being a word that has been lemmatized is always in the dictionary whereas one that has been stemmed may not be).

Sadly, we ran out time. But in 3 months we had managed to build a pure Java/Scala library that could be passed to the front-end team and give them good enough accuracy that the management team were happy.

Saturday, December 8, 2018

JSON Streaming in Spark


We have a fairly standard architecture where we want to stream JSON logs into HDFS preferably as Parquet. Kafka and Spark Structured Streaming and the obvious choices and here is a test that runs in a single JVM to demonstrate the flow.

Streaming or Structured Streaming


Note Spark has two kinds of streaming:

1. Spark streaming - the old way of doing it with a "Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data " (from the ScalaDocs).

2. Spark Structured Streaming - the new way.

What's new?

The new way is more elegant as we are just dealing with a function over a Dataset rather than a collection of RDDs. As a friend told me: "build your app as a streaming application because it can be extended to a batch easily (a batch is a stream with one big item in it). The opposite is not true, however." Batch is just a special case of streaming.

The new structured streaming has been optimized for low latency. "Since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements" (from the Spark docs).

There have been machine learning frameworks built on top of it such as this one here where HortonWorks recommends loading the prediction model from storage and calling predict() on all elements in a partition.

Kafka for the unitiated

"Data in Kafka is organized into topics that are split into partitions for parallelism. Each partition is an ordered, immutable sequence of records, and can be thought of as a structured commit log. Producers append records to the tail of these logs and consumers read the logs at their own pace. Multiple consumers can subscribe to a topic and receive incoming records as they arrive. As new records arrive to a partition in a Kafka topic, they are assigned a sequential id number called the offset. A Kafka cluster retains all published records—whether or not they have been consumed—for a configurable retention period, after which they are marked for deletion" (from the DataBricks blog).

Kafka to Spark, Spark to Kafka

Note, data can be written by Spark back to Kafka: "Writing data from any Spark supported data source into Kafka is as simple as calling writeStream on any DataFrame that contains a column named “value”, and optionally a column named “key”. If a key column is not specified, then a null valued key column will be automatically added. A null valued key column may, in some cases, lead to uneven data partitioning in Kafka, and should be used with care" (ibid).

Flume?

Kafka uses a pull paradigm. "On the contrary, Flume sink supports push model.  When event producers suddenly generate a flood of messages, even though flume channel somewhat acts as a buffer between source and sink, the sink endpoints might still be overwhelmed by the write operations.

"Moreoever, the file-based channel does not replicate event data to a different node. It totally depends on the durability of the storage it writes upon. If message durability is crucial, it is recommended to use SAN or RAID. Kafka supports both synchronous and asynchronous replication based on your durability requirement and it uses commodity hard drive" (LinkedIn).

Parsing JSON without JSON parsers

There are (too?) many JSON parsers for Scala(there is an excellent comparison made here). But we don't need them if all we want is to dump the data into Parquet with the same schema as the JSON.

Since hand-coding schemas is tedious this StackOverflow page suggests using a sample piece of JSON, reading it as a DataFrame and borrowing that schema.

The next part of this cunning trick is then to use Spark's from_json function so:

val outgoing = jsonDF.select(from_json($"value", struct))

Finally, we can call:

outgoing.writeStream.format("parquet")....start()

(having set the appropriate options) et voila, our Spark code is pulling JSON from Kafka and writing Parquet to HDFS in just a few lines.


Thursday, December 6, 2018

More Spark Query Plans


The second part in discovering what is going on in query plans (the first is here). I imagine there will be more...

Predicate Push Down

Seeing PushedFilters in the DataFrame.explain output mean predicate push down is at work. For example:

*(1) Project [id#21, text#22, intkey#23, partitionkey#24]
+- *(1) Filter (isnotnull(intkey#23) && (intkey#23 = 3))
   +- *(1) FileScan parquet [id#21,text#22,intkey#23,partitionkey#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://127.0.0.1:41684/1544107487759], PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(intkey), EqualTo(intkey,3)], ReadSchema: struct

The code to generate this lives here in GitHub.

"Predicate push down is a feature of Spark and Parquet that can help increase the performance of your queries by reducing the amount of data read based on metadata stored in Parquet about the minimum and maximum values of stored in columns and aggregated per row group. However this feature has limitations, notably, it can only be used with certain data types and operators as implemented in Parquet and Spark. Moreover, even when filters are pushed down, the actual reduction of I/O and relative increase in performance vary: the results depend on the provided filter values and data distribution in the source table." (from an excellent blog at CERN).

Asterisks

"With whole-stage code generation, all the physical plan nodes in a plan tree work together to generate Java code in a single function for execution. This Java code is then turned into JVM bytecode using Janino, a fast Java compiler... Where ever you see *, it means that wholestagecodegen has generated hand written code prior to the aggregation. Exchange means the Shuffle Exchange between jobs.Exchange does not have whole-stage code generation because it is sending data across the network. " (StackOverflow)

HashAggregate

"When spark is doing dataframe operation, it does first compute partial counts for every partition and then having another stage to sum those up together." These need to be sent across the network triggering an Exchange.  (StackOverflow).

Note, you might see:

.
.
  +- *HashAggregate(COLUMN_FOR_GROUPBY#INDEX, 200
.
.

The 200 is the number of partitions and is defined by spark.sql.shuffle.partitions.

If you just try to set it with .repartition(NEW_NUMBER), it doesn't appear to change. However, .repartition(NEW_NUMBER, COLUMN) does seem to change it.
See the Mastering Spark SQL GitHub page.

Parenthesis

"Everything with the same index number is in one stage. So, stage boundaries can be recognized by exchange operations that involve a shuffle." [SO]