Monday, December 24, 2018

Unbalanced


... or imbalanced (apparently, imbalanced is the noun and unbalanced is the verb. Live and learn).

Anyway, in a data scientist job interview I was asked how I would build a network intrusion system using machine learning. I replied that I could build them a system that was 99% accurate. Gasps from my interviewers. I said, I could even give 99.9% accuracy. No! I could give 99.99% accuracy with ease.

I then admitted I could do this by saying that all traffic was fine (since much less than 0.01% was malicious), bill them and then head home. The issue here is imbalanced data.

This is a great summary of what to do in this situation. To recap:

1. If you have plenty of data, ignore a lot from the major class. If you have too little, duplicate the minor class.

2. Generate new data by "wiggling" the original (eg, the SMOTE algorithm).

3. "Decision trees often perform well on imbalanced datasets" [ibid].

4. Penalize mistakes in misclassifying the minor data.

For this last suggestion, you can use a meta algorithm and any model but in ANNs, you get it out of the box. In DL4J, I added something like this:

.lossFunction(new LossNegativeLogLikelihood(Nd4j.create(Array(0.01f, 1f)))

"Ideally, this drives the network away from the poor local optima of always predicting the most commonly ccuring class(es)." [1]

Did it improve my model? Well, first we have to abandon accuracy as our only metric. We introduce two new chaps, precision and recall.

Precision is the ratio of true positives to all the samples my model declared were positive irrespective of being true or false ("what fraction of alerts were correct?"). Mnemonic: PreCision is the sum of the Positive Column. Precision = TP / (TP + FP).

Recall is the ratio of true positives to all the positives in the data ("what fraction of intrusions did we correctly detect?"). Mnemonic: REcall deals with all the REal positives. Recall = TP / (TP + FN).

(There is also the F1 score which "is the 2*precision*recall/(precision+recall). It is also called the F Score or the F Measure. Put another way, the F1 score conveys the balance between the precision and the recall" - Dr Jason Brownlee. This equation comes from the harmonic mean of precision and recall).

What is a desirable results is very use case dependent. But with weights 0.005f, 1f, I got:

Accuracy:  0.9760666666666666
Precision: 0.3192534381139489
Recall:    0.9285714285714286
.
.

=========================Confusion Matrix=========================
     0     1
-------------
 28957   693 | 0 = 0
    25   325 | 1 = 1

Confusion matrix format: Actual (rowClass) predicted as (columnClass) N times
==================================================================

using faux data (6000 observations of a time series 50 data points long and a class imbalance of 100:1)

I got the job to build an intrusion detection system and these results would make the customer very happy - although I can't stress enough that this is a neural net trained on totally fake data. But if we could have 93% of intrusions detected with in only 1 in three investigations yielding a result, the client would be overjoyed. Currently, the false positives are causing them to turn the squelch in their current system too high and consequently they're not spotting real intrusions.

[1] Deep Learning: A Practitioner's Approach


Saturday, December 22, 2018

Faux data and DeepLearning4J


Like most people in corporate data science, I'm having a hard time getting my hands on real but commercially sensitive data within the oganisation. To this end, I'm writing a library to generate generic data. Sure, ersatz data is rarely decent but I figured if my models can't find the signal deliberately planted in the data, they're unlikely to find a signal in the real data.

I'm using the DeepLearning4J framework to see if the data is sensible. DL4J looks like it could be a real game-changer in the near future as it integrates easily with Spark (I ran their example with no major problems but noted it took quite a long time) and can import Keras models.

The first thing I did was try to build an Artificial Neural Net to detect unusual behaviour. One of the challenges I have in the day job is to distinguish between a network intrusion and genuine behaviour. Machines being accessed at night are suspicious but not necessarily compromised (some insomniac wants to do some work, for example). So, could a LSTM neural net (specially designed for time series) distinguish between night time and day time activity?

I stole the code for the ANN from here but came across this bug in the DL4J library, wasting a lot of time. Unfortunately, there has been no release with this bug fix so I am forced to use SNAPSHOTs. But even then, I was unable to detect the signal in the noise and kept seeing "Warning: 1 class was never predicted by the model and was excluded from average precision". Since I only have two classes (hacked/not hacked) this wasn't a great sign. Since the DL4J guys say "at this point you are just looking at a plain old tuning problem", I spent a lot of fruitless time twiddling knobs.

The problem was more profound than this. It appears I was asking an LSTM the wrong question. That is, although the data was a time series, each point in time was entirely unrelated to all the others. Changing the data to being either (1) spread randomly across a year and (2) tightly clustered around date-time gave the neural net a fighting chance of distinguishing the two. In the latter case, the data points bunch around a single date-time (imagine somebody fat-finguring their password a few times per year versus somebody trying to guess it in a small time window). In this case, each timestamp is highly dependent on the others.

Monday, December 17, 2018

Spark, Parquet, Stats


If you're using Spark to write data to HDFS, you might want to give a thought to how it is going to be read. How you write a Dataframe can make a huge difference to the efficiency of how we read.

This test creates a Dataframe with, let's say, columns X, Y and Z. We sort by X, partition by Y then save it.

This results in |Y| directories under our root (one for each Y-value) and at least as many parquet files as a Cartesian product of X and Y up to a maximum of 400. I'm not currently sure what this maximum comes from (it appears to come from spark.sql.shuffle.partitions * the number of partitions).

Only when we exceed this 400 number do we start finding multiple values of X in a single Parquet file.

Interestingly, although for our test, 6 constituent Parquet files are created (=|X| * |Y|= 3*2) when we save our Dataframe, the Dataframe that comes from reading the data back from HDFS has 4 partitions. That is, a partitions straddles more than one parquet file.

Predicate Pushdown

The reason we sort before we save is so each parquet file has a chance of containing only one (or at worst a small subset of) X. This makes searching much easier as Parquet might advertise the range for a column it has inside it. If your sought value is not in that range, the file is not searched.

To demonstrate this, we take a Dataframe that's sorted by "intKey" (X) and we partitionBy "partitionkey" (Y).

scala> val query = dataframe.where(s"$intKey == 3")
scala> query.explain()

== Physical Plan ==
*(1) Project [id#21, text#22, intkey#23, partitionkey#24]
+- *(1) Filter (isnotnull(intkey#23) && (intkey#23 = 3))
   +- *(1) FileScan parquet [id#21,text#22,intkey#23,partitionkey#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://127.0.0.1:41684/1544107487759], PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(intkey), EqualTo(intkey,3)], ReadSchema: struct

All good but note this caveat from CERN: "this feature has limitations, notably, it can only be used with certain data types and operators as implemented in Parquet and Spark. [Also] the results depend on the provided filter values and data distribution in the source table."

(If we were to search on partitionkey, then this would be an example of partition pruning).

Parquet tools


I did try to use the Hadoop Parquet tools but I could not find the min/max values of the Statistics object even if I set parquet.strings.signed-min-max.enabled as true in the config for my ParquetFileReader.

Not to worry, Spark can show us these statistics using dataframe.describe (StackOverflow). But, instead of loading the whole Dataframe, we instead just load an isolated parquet file via the usual means. When we do, we see something like:

+-------+-----------------+--------------------+------+
|summary|               id|                text|intkey|
+-------+-----------------+--------------------+------+
|  count|             1666|                1666|  1666|
|   mean|           5001.0|            Infinity|   0.0|
| stddev|2886.462540896729|                 NaN|   0.0|
|    min|                6|65656565656565656...|     0|
|    max|             9996|89898989898989898...|     0|
+-------+-----------------+--------------------+------+

(Note the stats for the text field).

Now we can see that for this particular Parquet file (that is just one piece of the whole Dataframe we started with) that the intkey field has a min/max of 0/0. So, Spark knows that if it is looking for intkey=3, not to bother even reading this file.

Buckets

Note that you can't call bucketBy when saveing  a data frame. Only on saveAsTable can you do this (see a previous post on this). You can then call analyze table on it.

Monday, December 10, 2018

Feature Engineering


It's no secret that most of data science is actually data munging. Indeed, since we've only been using a Naive Bayes model, there was only one hyperparameter: that for Laplacian smoothing. Tuning this did indeed make our model better but that was a fairly trivial exercise.  And it was the feature engineering that made the greatest difference.

Most online machine learning tutorials focus on the model not feature engineering (the only book on the subject that I am aware of is being written here). Perhaps because it is an art as much as a science. Well, these are some notes I've made over the last four months when trying to bring a bank transaction classification system to market. YMMV.

Data cleaning and domain knowledge

Some machine learning tool vendors say their products can do this for you auto-magically but practitioners I meet are skeptical. For instance, a colleague was working on a security system for a mobile banking app. If the 'phone had been compromised, some software by an Israeli company would flag the transaction as suspect. Yet, in all his training data not a single transaction was marked as such. Automated machine learning tools would have thought this field useless and discarded it yet it was incredibly important.

My client for this my most recent project was an Austrian bank. Now, there was some fairly trivial things we could do when feature engineering for German textual data. Replacing all German characters with Latin characters improved things a little. As did the fact that Germans often avoid umlauts when online replacing, for instance, "ö" with "oe" (the national rail company is öbb but their website is www.oebb.at).  So cleaning the data this way gave a slight improvement. But some things were not so straight forward.

The data

Our data was not much more than what you typically see on your bank statements. That is, some natural language and some other miscellaneous data. This other data included things you won't see on your statements like "POS K2 14/02/18 11:55" which is saying it's a Point of Service transaction (that is, you were in a store) at cashier desk K2 at a given date/time.

Our first instinct was to just treat the transactions as a corpus of text and for each word calculate its distribution over all characters. But why stick with unigrams?

Are Bigrams Better?


Maybe. They improved our bank transaction classification accuracy by about 4%. There are no a priori reasons for why bigrams are better for us. For the most part, we're not dealing with natural language and there is no reason why "BillaK2" makes more sense than "K2Billa" (order was important - see below).

It did make sense in some fields - for instance the recipient. "Phillip" and "Henry" are common enough names but "PhillipHenry" is sufficiently rare (in the German speaking world at least) for it to identify a single individual.

This unexpected boost in accuracy is not totally unheard of. David Marchette tells how he made a model that used a histogram over the bytes (that is, there were 256 buckets) in a computer virus binary when trying to identify which virus it was. It was surprisingly accurate (86% vs. a monkey-score of about 5%). "Computer scientists laugh when I tell this story" he says, not because binary executables work on word not byte sized units of instructions. So, in theory, it shouldn't work. But in practice, it does.

Field order

This made a remarkable difference. Since we were not using natural language, the order of the text fields was arbitrary. But choosing a wrong permutation could reduce accuracy by a massive 10%.

What an optimal permutation was could only be derived by brute force.

Filtering out noise

Calculating the Kullback-Leibler divergence between the distribution of words in the successfully and unsuccessfully classified data sets identified words that appeared to be over-represented. Ignoring them improved accuracy by about 2% for unigrams but made no difference to bigrams.

Similarly, looking at the entropy of the distribution of a word over all the categories and eliminating those with low entropy did not improve things.

Simply eliminating small words (less than 3 characters) and all strings that were just numerical boosted accuracy by a few percent for both bigrams and unigrams.

Another data masaging technique we used was to bucket transaction amounts rather than use raw figures. However, it raised the question: how best do we organise these buckets? One trick is to use a Random Forest to help you create features. Random Forests don't work well on high dimensional data (which ours would be if we had a field per word - we had a million strings) so we tried all but these features. Accuracy was unsurprisingly an appalling 35% but RFs are easy to read and I used its amount buckets in my model. However, it actually reduced accuracy somewhat. So, instead I stuck with my (roughly) logarithmic buckets.

Splitting the data caveat

Randomly selecting transactions to be used in the training and testing sets consistently resulted in about 95% accuracy. Pretty good. However, this hid a significant problem. If we randomly selected users for the training and testing sets and then partitioned their transactions, accuracy dropped to about 66%.

[Aside: I'm clearly not the first to make a mistake like that. See this data scientist make fall into the exact trap when using time series that this data scientist warns against].

This was because a lot of transactions were peculiar to the individual. For instance, if a tenant pays his landlord every month using the text "Mr Phillip Henry" and marks it as rent, only one transaction in the training set may be needed to train the model. However, if this user was not in the training data (and consequently none of his/her transactions) then the model would have a hard time concluding that a payment was rent purely on the text "Mr Phillip Henry".

This was something of a problem to the business as new users experience a low accuracy in transaction categorization. We never solved this problem and the business had to accept that the model had to learn a user's habits before it could be a great performer.

Future work

Alas, data wrangling never comes to an end. If I had more time, I would have investigated:

- Pointwise mutual information (see this SO answer for more pointers).
- Using n-grams of characters
- Bagging between different n-gram models
- Creating new features from eg, number of words in text, average word size etc
- text lemmatization and stemming (the difference being a word that has been lemmatized is always in the dictionary whereas one that has been stemmed may not be).

Sadly, we ran out time. But in 3 months we had managed to build a pure Java/Scala library that could be passed to the front-end team and give them good enough accuracy that the management team were happy.

Saturday, December 8, 2018

JSON Streaming in Spark


We have a fairly standard architecture where we want to stream JSON logs into HDFS preferably as Parquet. Kafka and Spark Structured Streaming and the obvious choices and here is a test that runs in a single JVM to demonstrate the flow.

Streaming or Structured Streaming


Note Spark has two kinds of streaming:

1. Spark streaming - the old way of doing it with a "Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data " (from the ScalaDocs).

2. Spark Structured Streaming - the new way.

What's new?

The new way is more elegant as we are just dealing with a function over a Dataset rather than a collection of RDDs. As a friend told me: "build your app as a streaming application because it can be extended to a batch easily (a batch is a stream with one big item in it). The opposite is not true, however." Batch is just a special case of streaming.

The new structured streaming has been optimized for low latency. "Since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements" (from the Spark docs).

There have been machine learning frameworks built on top of it such as this one here where HortonWorks recommends loading the prediction model from storage and calling predict() on all elements in a partition.

Kafka for the unitiated

"Data in Kafka is organized into topics that are split into partitions for parallelism. Each partition is an ordered, immutable sequence of records, and can be thought of as a structured commit log. Producers append records to the tail of these logs and consumers read the logs at their own pace. Multiple consumers can subscribe to a topic and receive incoming records as they arrive. As new records arrive to a partition in a Kafka topic, they are assigned a sequential id number called the offset. A Kafka cluster retains all published records—whether or not they have been consumed—for a configurable retention period, after which they are marked for deletion" (from the DataBricks blog).

Kafka to Spark, Spark to Kafka

Note, data can be written by Spark back to Kafka: "Writing data from any Spark supported data source into Kafka is as simple as calling writeStream on any DataFrame that contains a column named “value”, and optionally a column named “key”. If a key column is not specified, then a null valued key column will be automatically added. A null valued key column may, in some cases, lead to uneven data partitioning in Kafka, and should be used with care" (ibid).

Flume?

Kafka uses a pull paradigm. "On the contrary, Flume sink supports push model.  When event producers suddenly generate a flood of messages, even though flume channel somewhat acts as a buffer between source and sink, the sink endpoints might still be overwhelmed by the write operations.

"Moreoever, the file-based channel does not replicate event data to a different node. It totally depends on the durability of the storage it writes upon. If message durability is crucial, it is recommended to use SAN or RAID. Kafka supports both synchronous and asynchronous replication based on your durability requirement and it uses commodity hard drive" (LinkedIn).

Parsing JSON without JSON parsers

There are (too?) many JSON parsers for Scala(there is an excellent comparison made here). But we don't need them if all we want is to dump the data into Parquet with the same schema as the JSON.

Since hand-coding schemas is tedious this StackOverflow page suggests using a sample piece of JSON, reading it as a DataFrame and borrowing that schema.

The next part of this cunning trick is then to use Spark's from_json function so:

val outgoing = jsonDF.select(from_json($"value", struct))

Finally, we can call:

outgoing.writeStream.format("parquet")....start()

(having set the appropriate options) et voila, our Spark code is pulling JSON from Kafka and writing Parquet to HDFS in just a few lines.


Thursday, December 6, 2018

More Spark Query Plans


The second part in discovering what is going on in query plans (the first is here). I imagine there will be more...

Predicate Push Down

Seeing PushedFilters in the DataFrame.explain output mean predicate push down is at work. For example:

*(1) Project [id#21, text#22, intkey#23, partitionkey#24]
+- *(1) Filter (isnotnull(intkey#23) && (intkey#23 = 3))
   +- *(1) FileScan parquet [id#21,text#22,intkey#23,partitionkey#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://127.0.0.1:41684/1544107487759], PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(intkey), EqualTo(intkey,3)], ReadSchema: struct

The code to generate this lives here in GitHub.

"Predicate push down is a feature of Spark and Parquet that can help increase the performance of your queries by reducing the amount of data read based on metadata stored in Parquet about the minimum and maximum values of stored in columns and aggregated per row group. However this feature has limitations, notably, it can only be used with certain data types and operators as implemented in Parquet and Spark. Moreover, even when filters are pushed down, the actual reduction of I/O and relative increase in performance vary: the results depend on the provided filter values and data distribution in the source table." (from an excellent blog at CERN).

Asterisks

"With whole-stage code generation, all the physical plan nodes in a plan tree work together to generate Java code in a single function for execution. This Java code is then turned into JVM bytecode using Janino, a fast Java compiler... Where ever you see *, it means that wholestagecodegen has generated hand written code prior to the aggregation. Exchange means the Shuffle Exchange between jobs.Exchange does not have whole-stage code generation because it is sending data across the network. " (StackOverflow)

HashAggregate

"When spark is doing dataframe operation, it does first compute partial counts for every partition and then having another stage to sum those up together." These need to be sent across the network triggering an Exchange.  (StackOverflow).

Note, you might see:

.
.
  +- *HashAggregate(COLUMN_FOR_GROUPBY#INDEX, 200
.
.

The 200 is the number of partitions and is defined by spark.sql.shuffle.partitions.

If you just try to set it with .repartition(NEW_NUMBER), it doesn't appear to change. However, .repartition(NEW_NUMBER, COLUMN) does seem to change it.
See the Mastering Spark SQL GitHub page.

Parenthesis

"Everything with the same index number is in one stage. So, stage boundaries can be recognized by exchange operations that involve a shuffle." [SO]

BatchScan

"BatchScan non empty filters shows that your predicates have been pushed down to the datasources by spark." [Iceberg Slack]. Note that it is not a given your predicate will be pushed down. If it's the result of a function, how would Spark know the predicate was true without running the function?

Thursday, November 29, 2018

Hive


I'm building a Hadoop ecosystem test harness that will allow a whole slew of applications to be tested in a single JVM, in a single integration test (source).

A test for this harness fired up Zookeeper, Kafka, HDFS, Spark and finally Hive. It feeds Kafka with messages and Spark Structured Streaming processes them writes them as Parquet to HDFS. Finally, Hive reads this file and checks that what it reads is what Spark processes.

This all works fine until I decided to partition the Spark DataStreamWriter. Then, Hive didn't see any data and the the test failed upon an assertion that what Spark sees, Hive sees. Annoyingly, the test did not fail because of what is essentially misconfiguration. The reason being that Hive keeps a store of meta data about all the things it can see and although the data is there to be read, it's not been told to do so until the metastore is updated.

So, putting a long Thread.sleep in the test code I then fired up Beeline with something like:

beeline> !connect  jdbc:hive2://localhost:40327/default

whereupon we're asked to enter a user name and password. First, we recognize the data:

0: jdbc:hive2://localhost:40327/default> create external table parquet_table_name (key String, value String) PARTITIONED BY (partitionKey Date)  STORED AS PARQUET LOCATION 'hdfs://127.0.0.1:33799/tmp_parquet/';
No rows affected (0.077 seconds)

but we can't see any rows:

0: jdbc:hive2://localhost:40327/default> select count(*) from parquet_table_name;
No rows selected (0.204 seconds)
+------+
| _c0  |
+------+
| 0    |
+------+

This seems to be because there are no partitions:

0: jdbc:hive2://localhost:40327/default> show partitions parquet_table_name;
+------------+
| partition  |
+------------+
+------------+
No rows selected (0.219 seconds)

The solution was to add the partitions manually:

1: jdbc:hive2://localhost:40327/default> ALTER TABLE parquet_table_name ADD PARTITION (partitionKey='2018-11-28') location 'hdfs://127.0.0.1:33799/tmp_parquet/partitionKey=2018-11-28/';
No rows affected (0.155 seconds)
1: jdbc:hive2://localhost:40327/default> show partitions parquet_table_name ;
No rows affected (0.327 seconds)
+--------------------------+
|        partition         |
+--------------------------+
| partitionkey=2018-11-28  |
+--------------------------+

Contrary to the advice I read elsewhere, MSCK REPAIR TABLE parquet_table_name  SYNC PARTITIONS;  did not seem (because of camel-case names) to help me (the command ALTER TABLE table_name RECOVER PARTITIONS; seems to be just for Amazon's version of Hive).

Now, counting the rows gives me (some of) my data:

1: jdbc:hive2://localhost:40327/default> select count(*) from parquet_table_name;
+-------+
|  _c0  |
+-------+
| 2879  |
+-------+
1 row selected (1.939 seconds)

I'm not a Hive expert so there may be a solution that just adds anything in the directory (despite all the advice on forums, I could not get that to work the column name on which it's partitioned needs to be lowercase and mine was camel case). However, this hack works for now.

There appears to be a JIRA to make this automatic but there is nothing at the moment.

Thursday, November 8, 2018

Science


Summer seems an age ago but these are some notes I took from my holiday book list. The theme is "what is science?" a deceptively easy question. See Confidence, Credibility, and why Frequentism and Science do not Mix which argues "frequentism is generally answering the wrong question. (Briefly, it says: "frequentists consider model parameters to be fixed and data to be random, while Bayesians consider model parameters to be random and data to be fixed... the Bayesian solution is a statement of probability about the parameter value given fixed bounds. The frequentist solution is a probability about the bounds given a fixed parameter value.")

The Scientific Methodology

Inductive reasoning is where "the premises are viewed as supplying strong evidence for the truth of the conclusion. While the conclusion of a deductive argument is certain, the truth of the conclusion of an inductive argument may be probable, based upon the evidence given." (Wikipedia)

"The scientific procedure for the study of a physical system can be (rather arbitrarily) divided into the following three steps.

i) Parameterization of the system: discovery of a minimal set of model parameters whose values completely characterize the system (from a given point of view).

ii) Forward modeling: discovery of the physical laws allowing us, for given values of the model parameters, to make predictions on the results of measurements on some observable parameters.

iii) Inverse modeling: use of the actual results of some measurements of the observable parameters to infer the actual values of the model parameters.

Strong feedback exists between these steps, and a dramatic advance in one of them is usually followed by advances in the other two. While the first two steps are mainly inductive, the third step is deductive. This means that the rules of thinking that we follow in the first two steps are difficult to make explicit. On the contrary, the mathematical theory of logic (completed with probability theory) seems to apply quite well to the third step, to which this book is devoted."

- Inverse Problem Theory, Albert Taratola

The Logic of Science

First, terminology. "A syllogism ... is a kind of logical argument that applies deductive reasoning to arrive at a conclusion based on two or more propositions that are asserted or assumed to be true" (Wikipedia). For example, all men are mortal. Socrates is a man. Therefore, Socrates is mortal.

In The Logic of Science, E T Jaynes argues: "our theme is simply: Probability Theory as Extended Logic.

"Deductive reasoning can be analyzed ultimately into the repeated application of two strong syllogisms:

If A is true, then B is true
A is true
Therefore, B is true

and its inverse

If A is true, then B is true
B is false
Therefore, A is false

[However] ... in almost all the situations confronting us we do not have the right kind of information to allow this kind of reasoning. We fall back on weaker syllogisms

If A is true, then B is true
B is true
Therefore, A becomes more plausible

"The rain at 10 AM is not the physical cause of the clouds at 9:45 AM. Nevertheless, the proper logical connection is not in the uncertain causal direction (clouds ⇒ rain), but rather (rain ⇒ clouds) which is certain, although noncausal."

"Suppose some dark night a policeman walks down a street, apparently deserted; but suddenly he hears a burglar alarm, looks across the street, and sees a jewelry store with a broken window. Then a gentleman wearing a mask comes crawling out through the broken window, carrying a bag which turns out to be full of expensive jewelry. The policeman doesn’t hesitate at all in deciding that this
gentleman is dishonest [even if] there may have been a perfectly innocent explanation for everything.

"The reasoning of our policeman was not even of the above types. It is best described by a still weaker syllogism:

If A is true, then B becomes more plausible
B is true
Therefore, A becomes more plausible

"But in spite of the apparent weakness of this argument, when stated abstractly in terms of A and B, we recognize that the policeman’s conclusion has a very strong convincing power. There is
something which makes us believe that in this particular case, his argument had almost the power
of deductive reasoning."

Think Complexity

The rest of this post are notes I made from reading Think Complexity by Allen B Downey that he has generously made free to download (see here)

Science

Determinism. Statements from weak (1) to strong (4):

D1: Deterministic models can make accurate predictions for some physical systems.

D2: Many physical systems can be modelled by deterministic processes, but some are intrinsically random.

D3: All events are caused by prior events, but many physical systems are nevertheless
fundamentally unpredictable.

D4: All events are caused by prior events, and can (at least in principle) be predicted.

Wolfram’s demonstration of complex behaviour in simple cellular automata is … disturbing, at least to a deterministic world view.

Models

In general, we expect a model that is more realistic to make better predictions and to provide more believable explanations. Of course, this is only true up to a point. Models that are more detailed are harder to work with, and usually less amenable to analysis. At some point, a model becomes so complex that it is easier to experiment with the system. At the other extreme, simple models can be compelling exactly because they are simple.

Simple models offer a different kind of explanation than detailed models. With a detailed model, the argument goes something like this: “We are interested in physical system S, so
we construct a detailed model, M, and show by analysis and simulation that M exhibits a behaviour, B, that is similar (qualitatively or quantitatively) to an observation of the real system, O. So why does O happen? Because S is similar to M, and B is similar to O, and we can prove that M leads to B.

“With simple models we can’t claim that S is similar to M, because it isn’t. Instead, the argument goes like this: “There is a set of models that share a common set of features. Any model that has these features exhibits behaviour B. If we make an observation, O, that resembles B, one way to explain it is to show that the system, S, has the set of features sufficient to produce B.”

For this kind of argument, adding more features doesn’t help. Making the model more realistic doesn’t make the model more reliable; it only obscures the difference between the essential features that cause O and the incidental features that are particular to S. The features and are sufficient to produce the behaviour. Adding more detail, like features and z, might make the model more realistic, but that realism adds no explanatory power.

Scientific Realism

Think Heraclitus and the river.

Note: the gene was postulated 50 years before DNA was discovered. During that time, it was only a postulated entity rather than a “real” one.

SR1: Scientific theories are true or false to the degree that they approximate reality, but no
theory is exactly true. Some postulated entities may be real, but there is no principled
way to say which ones.

SR2: As science advances, our theories become better approximations of reality. At least
some postulated entities are known to be real.

SR3: Some theories are exactly true; others are approximately true. Entities postulated by
true theories, and some entities in approximate theories, are real.

SR4: A theory is true if it describes reality correctly, and false otherwise. The entities postulated by true theories are real; others are not.

Instrumentalism

But SR1 is so weak that it verges on instrumentalism, which is the view that we can’t say whether a theory is true or false because we can’t know whether a theory corresponds to
reality.

Criticality

“A system is “critical” if it is in transition between two phases; for example, water at its freezing point is a critical system.”

Think sand pile, cellular automata and periodic avalanches. Common behaviour of criticality is:

“Long-tailed distributions of some physical quantities: for example, in freezing water
the distribution of crystal sizes is characterized by a power law.

“Fractal geometries: canonical example is a snowflake. Fractals are characterized by self-similarity; that is, parts of the pattern resemble scaled copies of the whole.”

Pink noise: “Specifically, the power at frequency is proportional to 1/ .”

Reductionism

“A reductionist model describes a system by describing its parts and their interactions... it depends on an analogy between the components of the model and the components of the system”

An example is the Ideal Gas laws which approximates to reality by ignoring inter-molecular interactions

Holistic

“Holistic models are more focused on similarities between systems and less interested in
analogous parts.

• Identify a kind of behavior that appears in a variety of systems.

• Find the simplest model that demonstrates that behavior.”

For example, propagation of memes like genes not analogous but share the same evolutionary behaviour.

Prediction of SoC (self-organized criticality)

“If Perrow’s “normal accident theory” is correct, there may be no special cause of large failures.”

Agent-based Models

"The characteristics of agent-based models include:

• Agents that model intelligent behavior, usually with a simple set of rules.
• The agents are usually situated in space (or in a network), and interact with each
other locally.
• They usually have imperfect, local information.
• Often there is variability between agents.
• Often there are random elements, either among the agents or in the world.

"The Highway is a one-lane road that forms a circle, but it is displayed as a series of rows that spiral down the canvas. Each driver starts with a random position and speed. At  each time step, each Driver accelerates or brakes based on the distance between it and the Driver in front. ... If the following distance is too short, the Driver brakes; otherwise it accelerates. [There are] two other constraints: there is a speed limit for each driver, and if the current speed would cause a collision, the Driver comes to a complete stop.. You will probably see a traffic jam, and the natural question is, “Why?” There is nothing about the Highway or Driver behavior that obviously causes traffic jams."

Thursday, October 25, 2018

Parquet tuning


I watched Ryan Blue's excellent "Parquet Performance Tuning" video (here and slides here). But before watching it, it's worth familiarizing yourself with the glossary at the Parquet github page:

Row group: A logical horizontal partitioning of the data into rows. There is no physical structure that is guaranteed for a row group. A row group consists of a column chunk for each column in the dataset.

Column chunk: A chunk of the data for a particular column. They live in a particular row group and are guaranteed to be contiguous in the file.

Page: Column chunks are divided up into pages. A page is conceptually an indivisible unit (in terms of compression and encoding). There can be multiple page types which are interleaved in a column chunk.

Hierarchically, a file consists of one or more row groups. A row group contains exactly one column chunk per column. Column chunks contain one or more pages. The units of parallelization are:

JobGranularity
MapReduceFile/Row Group
IOColumn chunk
Encoding/CompressionPage

Tuning Parquet

My notes from the video are:
  • Parquet builds a dictionary and "just stores pointers into that dictionary ... basically the big trick in Parquet compression". 
  • Dictionaries not only save space but can be used for filtering (no entry means that data page doesn't have it). It is highly recommend to implement data locality as it leads to smaller dictionaries.
  • Be careful of writing too many files (22'18"). Incoming partitions are not the same as outgoing partitions. That is, the partition key on the incoming file need not be that of the outgoing.
  • Look at CPU usage and bytes read not wall time as that can vary (just plain stats).
  • Use parquet-tools to see the stats demonstrating that your data really is partitioned as you expect and has sensible splits (eg, A-z is wrong but A-F, f-M etc is better).
  • Every row group has its own dictionary (43'10").
  • Decreasing row group size increases parallelism.
  • "You're going to need a sort anyway if you want to take advantage of stats" (46'10"). See below for a further explanation.
  • Brotli compression "does really really well with text data" (47'20").
To take advantage of the stats embedding in the files "you need to insert the data sorted on filtering columns Then you will benefit from min max indexes and in case of orc additional from bloom filters, if you configure them.  In any case I recommend also partitioning of files (do not confuse with Spark partitioning)" (see here).

Interestingly, the Parquet documentation has something to say about HDFS block sizes: "We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file."

Wednesday, September 26, 2018

Productionisation


We've played around with machine learning models to automatically categorize customer transactions. However, the accuracy of a model is irrelevant if it can't be taken to production. "One of the main reasons most [Data Science] tutorials won’t go beyond this step is because — productionisation is hard" (from Medium).

There are solutions - for instance Open Scoring which also appears to give you the break-down for the probabilities of a particular classification. This is great for us because if we don't get the prediction spot on, we want to give the user another suggestion.

Open Scoring offers the chance to drop in your PMML and you're good to go (although note "PMML is a great option if you choose to stick with the standard models and transformations. But if you’re interested in more, don’t worry there are other options" - Medium).

The old MLLib models could be turned to PMML easily (model.toPMML). But the new ones will need the Spark-JPMML library. However, this may clash with Spark's dependencies as both depend on JPMML. This appears to be fixed in Spark 2.3. See this Spark issue.

However, the IT department imposed another restriction: they don't have the resources to maintain Open Scoring servers. Instead, they want a Java library they can package with their product. Oh yes, and they're running out of JVM memory so it can't have too large a footprint, please.

So, one option might be H2O which saves the model as an actual piece of Java code that you can compile!

But since we were getting good results with Naive Bayes (both in Spark and in SciKit-Learn) we decided to code our own. Any decent developer can code it in quite a small number of lines.

The truth of the matter is that if you have lots of data and you're not smashing Kaggle, Naive Bayes is generally good enough for most purposes. Working on the principle of a Minimum Viable Product, development went something like this:
  1. Create the basic NB algorithm (accuracy: 81%)
  2. Add smoothing (with a value for alpha of 1E-9, accuracy: 85%)
  3. Use Kullback-Leibler between the correctly and incorrectly data to see which features are dissimilar. Eyeballing the largest discrepancies led us to conclude that 3-digit numbers confused matters and ignoring them completely improved things (accuracy: 89%).
  4. Trying n-grams (with n=2, accuracy: 91%).
  5. Trying different combinations of features while using bigrams (accuracy: 96%). Note that the combination of features is not obvious as we're not really processing natural language. We found the best permutation using brute-force.
with the IT team re-deploying our library with each iteration.

The nice thing about building a simple Machine Learning MVP is that you becomes familiar with the data very quickly. In my case, what became clear was that I was getting a lot of transactions that were falsely labeled as a misclassification. This was because the categories were not mutually exclusive (for example: is car tax to be classified as CAR or TAXES? It depends on the user).

Also, a simple model (like Naive Bayes) makes it easy to question the data ("why did you classify this transaction as X???") by simply looking at the likelihood vector for each word. Try doing that with a neural net...

[Edit: there is a good discussion of how other people tackle the problem of productionisation at Reddit].


Wednesday, September 19, 2018

Spark and Buckets


Unbalanced Partitions

Deliberately trying to get Spark to have an unbalanced partition

val n  = 10000000
val df = sc.range(1 to n).toDF("id")
df.groupByKey{ r => r.getInt(0) / n }.agg(count("id")).show()

seems fine but

val allKeysIn1Partition = df.map { r => 1 -> (r.getInt(0), "A" * 10000)
val repartitioned       = allKeysIn1Partition.repartition($"_1")
val pSizes              = repartitioned.mapPartitions { x => Seq(x.size).iterator }
pSizes.agg(max($"value"))

shows rapid progress for all but the last partition and indeed this query shows all the data lives in one partition.

Buckets

"Buckets can help with the predicate pushdown since every value belonging to one value will end up in one bucket. So if you bucket by 31 days and filter for one day Hive will be able to more or less disregard 30 buckets. Obviously this doesn't need to be good since you often WANT parallel execution like aggregations.

"So to summarize buckets are a bit of an older concept and I wouldn't use them unless I have a clear case for it. The join argument is not that applicable anymore..." (from here).

Note that you can't use bucketing for a simple save. It must be a saveAsTable call (see Laskowski's Mastering Spark SQL for more information) which is for Hive interoperability. Otherwise you'll get a "'save' does not support bucketBy and sortBy right now" error.

Partitioning vs. Bucketing

"Bucketing is another technique for decomposing data sets into more manageable parts" (from here). In Hive, for example, "suppose a table using date as the top-level partition and employee_id as the second-level partition leads to too many small partitions. Instead, if we bucket the employee table and use employee_id as the bucketing column, the value of this column will be hashed by a user-defined number into buckets. Records with the same employee_id will always be stored in the same bucket."

In Spark, "partitionBy ... has limited applicability to columns with high cardinality. In contrast bucketBy distributes data across a fixed number of buckets and can be used when a number of unique values is unbounded" (from the Spark documentation).