Friday, October 30, 2020

Odd Characters

The Problem

A data scientist is struggling to get his regex to work in Spark on Databricks. He's wanting to check the data for British postcodes (a.k.a. zipcodes). His regex looks like:

[A-Z]{1,2}[0-9][A-Z0-9]?\\s?[0-9]  

Everybody agrees that any string in the data that matches this regex can reasonably be considered a British postcode. However, he sees in his notebook many instances that look valid but were marked as invalid.

The Investigation

Despite spending most of the last 25 years using the JVM, I've only recently come to appreciate character encodings. It's something that seems annoying and mostly irrelevant. However, last month I wasted two days because the British pound sterling sign (£) can be represented as 0xa3 in what's commonly known as Extended ASCII as well as U+00A3 in Unicode. That means a difference of one byte - which can be the difference between being granted access if it's a password and being locked out if it's not. In this case, the extended ASCII version came from somebody typing on the Linux CLI and the Unicode came from a Java String derived from a properties file. 

Back to our regex. A Scala function seemed to behave differently when it was part of a UDF running on the Spark executor than when it was called locally in the Spark driver. To demonstrate this, the data scientist ran display(hisDataFrame) on the driver and copy-and-pasted the string that should be a British postcode but was not showing up as one into the regex code directly. And it matched! 

What was going on? Cue some false leads and dead ends concerning character encoding (eg, "Unicode grows... char is legacy ... use codePointAt" [SO]; it seems Java hasn't always played fairly with Unicode [SO] even though the code here worked fine on my JDK 1.8). 

But it was this answer on StackOverflow with which I converted the String to hexadecimal. It takes advantage of BigInteger being big-endian when contructed with byte arrays. Converted to Scala it looks like:

def toHex(x: String): String = String.format("0x14x", new java.math.BigInteger(1, x.getBytes))

Now it became clear that there were rogue spaces in the text. So, why had the data scientist's copy-and-paste not worked? Well, Databrick's display function seems to convert the contents of the Dataframe into HTML and multi-spaces in HTML are rendered as just a single space. So, he was pasting the wrong data into his function.

The solution

The solution was to simply add the expectation of mutli-spaces to the regex. But it raised a bigger issue. Although this Databricks display function is great for rendering data in a more pleasing manner, it opens you up to issues like this. Best to stick with DataFrame.show() if you're doing exploratory data analysis and use display for the execs. The former is not subject to this HTML rendering issue.

Wednesday, October 28, 2020

Streaming architecture


Some miscellaneous notes on streaming.

Streaming Semtantics are Difficult

"The assumption 'we receive all messages' is actually challenging for a streaming system (even for a batch processing system, though presumably the problem of late-arriving data is often simply ignored here). You never know whether you actually have received 'all messages' -- because of the possibility of late-arriving data. If you receive a late-arriving message, what do you want to happen? Re-process/re-sort 'all' the messages again (now including the late-arriving message), or ignore the late-arriving message (thus computing incorrect results)? In a sense, any such global ordering achieved by 'let's sort all of them' is either very costly or best effort." [StackOverflow, Michael G. Noll
Technologist, Office of the CTO at Confluent]

Kafka

"Brokers are not interchangeable, and clients will need to communicate directly with the broker that contains the lead replica of each partition they produce to or consume from. You can’t place all brokers behind a single load balancer address. You need a way to route messages to specific brokers." [Confluent Blog]

"But whatever you do, do not try to build an ESB around Kafka—it is an anti-pattern that will create inflexibility and unwanted dependencies." [Kai Waehner, Confluent]

Kafka as a Database

This comes from a talk by Martin Klepperman (Kafka as a Database).

He takes the canonical example used in SQL databases and extends it to Kafka. Let's send two parts of a financial transaction as a single message to Kafka. We then have Kafka separate it into two messages: one to a credit topic, one to a debit topic. We now have atomicity and durability but not isolation. If you think about it, this is the same as ReadCommitted

Note, if you crash half way between these two operations, you will need to dedupe or using Kafka's once-only semantics but you still have durability.

Example 2: imagine two users try to register the same user name. Can we enforce uniqueness? Yes, partition the topic on username and one will be before the other and all consumers will agree. We've achieved serializability (ie, an isolation level) and have lost parallelization.

Kafka Leaders and Followers - Gotcha

Jason Gustafson (Confluent) describes how Kafka logs can be truncated.

Say the leader writes locally some data but then dies before it can replicate that data to all of its ISRs. The remaining ISRs replicas nominate a new leader [via the "controller"] but it's an ISR that did not receive the data. Those ISRs that did receive it now have to remove it as they follow-the-leader but the leader never had it. Their logs are truncated to the high watermark.

Note that clients will not see this data as there is the notion of a watermark that is the highest commit ID across all ISRs. (Slow ISRs can be dropped from the set but this invariant still holds. The dropped ISRs can rejoin later when they have the requesite data).  The leader defines the high watermark. This piggybacks on the response to a fetch request. Consequently, the high watermark on the follower is not necessarily that on the leader.

Committed data is never lost.

Terminology

Reading documentation, references to out-of-bound data kept cropping up so I thought it worth making a note here:

"Out-of-band data is the data transferred through a stream that is independent from the main in-band data stream. An out-of-band data mechanism provides a conceptually independent channel, which allows any data sent via that mechanism to be kept separate from in-band data. 

"But using an out-of-band mechanism, the sending end can send the message to the receiving end out of band. The receiving end will be notified in some fashion of the arrival of out-of-band data, and it can read the out of band data and know that this is a message intended for it from the sending end, independent of the data from the data source." [Wikipedia]


Sunday, October 25, 2020

Covariance and Precision Matrices

My aim is to write an advanced maths book that uses just pictures :) This is my attempt for the concepts of covariance and precision.

Example 1: Student IQs

Let's take a class of 100 students and take 10 measurements for each of them. This measurement data is Gaussian in its distribution. Then, our data looks like this:

Data Heatmap

[Python code lives here].

We can see each student's measurements roughly conforms to a value but there is a large variety over all the students.

Now, we build the covariance matrix where:

  1. the expected value (mean) for a row is subtracted from each value in the vector
  2. the resulting vector is multiplied with all the other vectors and this forms the index for our matrix
  3. each cell is multiplied by the probability of that combination occuring. It's perfectly possible that this is zero in some cases but for our use case the distribution of which student and which measurement we might chose is uniform.

Note, regarding covariance as a measurement:

"Covariance is useful in some computations, but it is seldom reported as a summary statistic because it is hard to interpret. Among other problems, its units are the product of the units of X and Y. So the covariance of weight and height might be in units of kilogram-meters, which doesn’t mean much" - Think Stats

Anyway, our covariance matrix looks like this:

Covariance matrix heatmap

It should be immediately obvious that 
  1. each row has a strong covariance with itself (the diagonal)
  2. that the matrix is symmetric (since the covariance formula is symmetric) 
  3. and that all other cells than the diagonal are just noise.

The inverse of the covariance matrix is called the precision matrix. It looks like this:

Precision matrix heatmap

This represents the conditional probability of one student's measurements on another and not surprisingly, in our particular use case, the matrix is full of zeroes (plus some noise). In other words, the measurements for all students are independent. And why shouldn't they be? Given the measurements for student X, what does that have to do with the measurements for student Y? This might not be true for other datasets as we'll see in the next example. 

Example 2: A Stock Price

To illustrate this, I've written some Python code based on this excellent post from Prof Matthew Stephens. He asks us to imagine a random walk where on each step we add the value from a Gaussian distribution. The output might look like this:

Random walk


Note that just adding Gaussians qualifies as a Markov Chain as the next value only depends on the previous. That is:

X1 = N(σ2, μi)
X2 = X1 + N(σ2, μi)
X3 = X2 + N(σ2, μi
...
Xn = Xn-1 + N(σ2, μi)

We rewrite this so that our random variable, Z, is depicted as a vector of values generated from N(σ2, μi) and note that a random variable is neither random nor a variable in the programming sense of the words. It's values may have been drawn from a random distribution, but once we have them they are fixed as you can see from our Python code. 

Anyway, once we've done this, we can represent the above set of equations as matrices:

X = A Z

where A is just an n x n matrix where the lower left values are 1 and the others are 0 (try it).

The covariance matrix of A tells us how each of the elements are related to each other and looks like this:

Covariance Matrix of the Random Walk


Unsurprisingly, values are similar to their neighbours (the diaganol) and less similar to those further away (bottom left and top right corners). We'd expect that with a random walk.

[Just a note on covariance matrices: each row on n elements is centred on its mean. The resulting matrix is then multiplied by its transpose and divided by n - 1 (the -1 is the Bessel correction that comes from the fact we're sampling rather than using the entire population). But this depends on a certain interpretation of what a row is. If the row is taken from the same distribution, happy days.]

The precision matrix looks like this (zoomed into the top 10 x 10 sub matrix):

Precision Matrix of a Markov Chain

and this gives us an intuitive feeling for what the precision matrix is all about "The key property of the precision matrix is that its zeros tell you about conditional independence. Specifically [its values are] 0 if and only if Xi and Xj are conditionally independent given all other coordinates of X." [here] And this is what we see here. Each point is only dependent on its immediate neighbour - the very definition of a Markov process.

Friday, October 23, 2020

Optimizing Spark/DataBricks in Azure

We have about 1.5TB of parquet data in Gen 2 storage on Azure and we're trying to read it with a DataBricks cluster of 8 nodes each with 56gb and 16 cores. All I'm trying to do is some basic select('aColumn).distinct and where('anotherColumn < X).count()s.

Surprisingly, this is running like a dog: the first query takes about 50 minutes and the second about 18. Tuning is an empirical science but this felt wrong as 1.5TB of data is not that much these days.

Ganglia Metrics

Ganglia showed that the cluster was running at high capacity although the network usage seemed low. If this is showing the data being pulled from Gen2 into Spark workers, at 300mb/s it would take about an hour to pull it all out of storage. Indeed, this is roughly how long it took. 

However, when I took thread dumps, the executor threads were not waiting on IO. Typically, they were in Spark's CompressibleColumnBuilder (which, according to the docs, "builds optionally compressed byte buffer for a column"). So, it seemed that the cluster really was CPU-bound on parsing the files and garbage collection - which was bad but not horrendous:

Spark GC logs

Calling cache() on the data didn't seem to make queries much faster. Looking at the thread dumps again showed the threads spending a lot of time in CompressibleColumnBuilder for the initial select distinct and in java.io.FileInputStream for the subsequent counts. 

Remembering that  persisting with a storage level of MEMORY_AND_DISK_SER had given me a quick win in the past, I thought I'd try it again but with no success.

I tried enabling the Delta Cache. This is supposed to cache the Gen2 data in the cluster:
Storage Tab

Compare this to the storage tab when we're not using Delta Cache and you'll see there won't be any data in the "Parquet IO Cache". (Note that the page says "RDDs" when we're only using the Dataset API. This is standard). Anyway, enabling Delta Cache seemed to make no difference at all.

Using a "Delta Cache Accelerated" cluster actually made the initial distinct.count() take much longer (1.47 hours) with only marginally reduced time for the next query (16.8 minutes).

Disk is incredibly slow compared to memory. Ultimately, I was trying to process too much data with too little RAM and there's only so far tuning will get you before your metrics plateau. So, I made a cluster of 25 nodes each with 28gb of RAM and 8 cores. This did help but not as much as I was expecting. The distinct.count() now took 38 minutes and the where(...).count() 12 minutes. This is between a 25%-33% improvement but still seems a long time in absolute terms.

I want my Data Locality 

The problem appears to be that all the data is pulled to the executors even if we're only interested in just one column [see StackOverflow]. Evidence for this came from another look at Ganglia:

Ganglia Metrics during second query
This was taken after the distinct.count() and during the where(...).count() query. Note that there is no more network activity, suggesting the first query cached the data.

"In Azure really fast networks compensate for having data and compute separated." [ibid] Indeed, the threads seemed to spend all their time on the first call in code to parse the Parquet rather than blocking on IO. But you can't get over the fact that my two queries are doing much the same thing (no joins) but the first takes three times longer than the second. To demonstrate, instead of two different queries, I ran the distinct.count() query twice. The first time took 42 minutes, the second 15. So, it appears that pulling 1.5TB of data from Gen2 into Spark takes about 25 minutes.

Azure is not the only cloud offering that implements this architectural trick. GCP takes all the data out of storage and puts it into an ephemeral HDFS cluster.

Solution

A new day, a new dawn. Let's start the cluster again and go from the beginning. This time, I won't explicitly enable IO cache and I won't even call .cache() on my DataFrame. To my amazement, the same query ran in about 30 seconds today! It seemed that this .cache() call was the root of my problems as re-introducing it returned us to a 50 minute wait. This is pretty much the opposite of what you'd expect with an on-prem Hadoop/YARN cluster.