Wednesday, December 9, 2020

DR in an Azure Cloud Ecosystem

In our topology, we have a Databricks Spark Structured Streaming job reading from an HDInsights Kafka cluster that is VNet injected into our subscription. So, disaster recovery has to take into account two things:

  1. the Kafka cluster
  2. the blob storage where the data is landed by SSS.
Kafka

This Microsoft document outlines the usual way in securing your data in Kafka (high replication factors for disk writes; high insync replicas for in-memory writes; high acknowledgement factor etc). In addition, an HDInsight cluster are backed by managed disks that provide "three replicas of your data" each witihin their own availability zone that's "equipped with independent power, cooling, and networking" [Azure docs].

So, within a region, things look peachy. Now, how do we get these Kafka messages replicating across region? The HDInisghts documentation suggests using Apache MirrorMaker but note one critical thing it says:

"Mirroring should not be considered as a means to achieve fault-tolerance. The offset to items within a topic are different between the primary and secondary clusters, so clients cannot use the two interchangeably."

This is worrying. Indeed, there is a KIP to make MirrorMaker 2 fix this problem and others like it (like differences in partitions within topics of the same name; messages entering infinite loops etc). Confluent is pushing its Replicator that (it claims) is a more complete solution (there's a Docker trial for it here). And, there is Brooklin, but Azure says this would be self-managed.

Spark and Blobs

At the moment, all the data goes into one region. The business is aware that in the event of, say, a terrorist attack of the data centres, data will be lost. But even the infrastructure guys have the data being replicated from one region to another, note this caveat in the Azure documentation (emphasis mine):

"Because data is written asynchronously from the primary region to the secondary region, there is always a delay before a write to the primary region is copied to the secondary region. If the primary region becomes unavailable, the most recent writes may not yet have been copied to the secondary region."

But let's say that nothing so dramatic happens. Let's assume our data is there once we get our region back on its feet. In the meantime, what has been landed by SSS in the backup region is incompatible with what came before. This is because Spark stores its Kafka offsets in a folder in Gen2. It's just as well Spark is not writing to the directory that the erstwhile live region was using. If we had been writing to a directory that was common to both regions, some finagling would have to be done as we points the Spark job at another directory, effecting the RTO if not RPO.

Aside: A disaster of a different kind

In the old days of on-prem Hadoop clusters, you might see a problem where too many files were created. The consequence would be the Name Node goes down. Sometimes deciphering the Azure documentation is hard but this link  says the "Maximum number of blob containers, blobs, file shares, tables, queues, entities, or messages per storage account" has "No limit". 

Hopefully (caveat: I have not tested this in the wild) this problem has gone away.

Tuesday, December 8, 2020

Cache vs Delta Cache

Continuing the investigation into a lack of cache coherency in Databricks in my previous post,  I've raised the issue with the people at Azure.

In the document a representative from Microsoft pointed me to, there are two caches at play: Delta Cache and Apache Spark cache. "The Delta cache accelerates data reads by creating copies of remote files in nodes’ local storage" and is enabled by default on certain clusters. In effect, you get it for free when you use Databricks 

The Apache Spark cache is familiar one that is invoked by calling Dataset.cache().

My first complaint is that these two caches behave inconsistently. In the Delta Cache, updates are immediately available. But if we use the Apache Spark cache in conjunction with the Databrick's Delta Lake (not to be confused with the orthogonal Delta Cache), the data is frozen in time.

Now, in the Spark world, Datasets are strictly immutable. But Delta Lake "provides ACID transactions ... on top of your existing data lake and is fully compatible with Apache Spark APIs." Well, once I've invoked .cache(), and update the Delta Lake in the standard way:

    df.write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", CONDITION_STRING)
      .save(HDFS_DIRECTORY)

(working example is here) then I cannot see my own updates!

What is, perhaps, even more bizarre can be seen when I try to read the data afresh. A second, totally independent call (but using the same Spark session) to read the data like this:

session.read.format("delta").load(dir)

cannot see the data either! And just to show you that there is nothing up my sleeve, running a completely separate process (code here) to read the data shows that the change has indeed been persisted to HDFS. This appears to be because Spark caches the data plus the Delta Lake meta data too and cares not for Databricks' new semantics.

This brings me to my next complaint - the leaky abstraction. Databricks is trying to retrofit ACID transactions on an API that was built with immutability at its core. The claim that it's "fully compatible with Apache Spark APIs" seems not enitrely true.

I have a meeting scheduled with the MS representative who is currently of the opinion that we should just never call .cache(). On Azure Databrics, this does not seem too bad as the Delta Cache seems pretty fast. It sucks for anybody using Delta Lake on just HDFS.

Summary

If you call .cache() on a Spark Dataset while using Databricks Delta Lake format you will not be able to:

  • see updates from any other Spark session
  • see your own updates to this data
  • see the true data even if you re-read it from the underlying data store
unlesss you unpersist the Dataset.

[Addendum: I spoke to Databricks engineer, Sandeep Katta, on 17 December 2020 and he agreed the documentation is misleading. He says he'll generate a docs PR and submit it to the relevant team to make it clear that one should not use .cache when using Delta Lake]

Monday, November 9, 2020

Cache in the Azure

Gotcha!

What surprised me with the Azure/DataBricks/Spark architecture was a lack of cache coherence. Data is pulled from Azure's Gen2 and is stored closer to the Spark executors upon calling cache(). However, if another cluster updates that data, don't expect the first cluster to see it.

In my case, I was using DataBrick's Delta format to overwrite data as outlined here in the Delta Lake docs. But first, a business requirement demanded that if we're overwriting anything, an override flag must be set. However, because of a lack of cache coherence, this check indicated that there was nothing to overwrite even if another cluster had written that partition. This caused my code to overwrite what was there irrespective of the flag! This significantly changes the semantics of my Big Data application.

Furthermore, any writes after cache() has been called are not reflected in the original Gen2 storage in Azure...

"The Delta cache automatically detects when data files are created or deleted and updates its content accordingly. You can write, modify, and delete table data with no need to explicitly invalidate cached data." [DataBricks Delta Cache docs] This doesn't appear to be the case, at least if it's referring to the original file(s).

"Multiple writers across multiple clusters can simultaneously modify a table partition and see a consistent snapshot view of the table and there will be a serial order for these writes" [DataBrick Delta docs] Again, this might be true but not after cache() has been called, it appears.

The only interesting piece of architecture is that the Azure Gen2 storage was mounted using:

dbutils.fs.mount(  
  source       = s"wasbs://${dst}@${account}.blob.core.windows.net/",  
  mountPoint   = x,  
  extraConfigs = Map(s"fs.azure.account.key.$account.blob.core.windows.net" -> dbutils.secrets.get(scope = "discope", key = key))

Investigation continue but a friend at a completely different company that uses Databricks confirmed it's not me to whom this is happening. He too tried to replace a partition with a .option("replaceWhere=...") and although it appeared to work "locally" it did not change the underlying file store changed the underlying data store but the problem is upon further reading. The Delta log has been cached also and is now stale. You need to run spark.catalog.clearCache and then reads are OK.

Sunday, November 8, 2020

Statistical Covariance Part 2

I quoted in a previous post somebody who said that covariance matrices always have a determinant of 0. This isn't strictly true. What can I say? I'm not a very good mathematician.

The argument was that the means are always substracted from each row and with simple algebra you could demonstrate that the determinant is 0. But this ignores the fact that covariance is the expected value of two rows having their expected values subtracted. 

Say we have two different distributions from which we draw combinations. However, for reasons peculiar to the use case, certain combinations are not allowed. When we tabulate the probabilities for this state space, we'll have zeros in some cells. Even though the inner product of the corresponding two probability vectors might not be zero, the expected probability of the two together is.

Note another caveat. Bearing in mind that correlation is just covariance divided by the root of the product of both variances, "the correlation between A and B is only a measure of the strength of the linear relationship between A and B. Two random variables can be perfectly related, to the point where one is a deterministic function of the other, but still have zero correlation if that function is non-linear." [Daniel Shiebler's blog]

Positive Definite

A matrix is positive definite if xT M x > 0 (an equivalent definition is "a symmetric matrix whose eigenvalues are all positive real numbers" - Coding the Matrix, Klein, definition 12.6.1) 

Note that all matrices that can be expressed as ATA are positive semi definite (see this SO answer). The proof is simple: substitute ATA for M above. 

xT M x = xT ATA x = (x A)T(A x)  

and any non-zero vector or real number multiplied by itself is positive. Since our covariance matrix can be expressed as ATA it too is at least positive semidefinite (xT M x ≥ 0). But we know it must also be positive definite as you can't invert M x = 0 for x ≠ 0.

Why this last statement is true can be explained at this elegant SO answer. Basically, if M x = 0 for x ≠ 0 then each row of M must be linarly dependent on each other for the equation 

M i,j xj = 0 ∀ i 

to hold. That is, if you give me (n-1) values in a row, I can tell you the value of the last one. Here's a simple Python/Numpy example where the last value is a row is just the sum of the first two. It's easy to see that when multipled with the vector [1, 1, -1] this matrix would be in the null space:

>>> M = np.asmatrix([[1, 2, 3], [4, 5, 9], [6, 7, 13]])
>>> np.linalg.det(M)
0.0

But if a matrix is linearly dependent, it's determinant must be 0. And if a matrix's determinant is 0, it cannot be inverted. QED.

What if my Matrix is Singular?

We add a tiny amount to make it non-singular. This is called conditioning. It sounds like a hack but it does have a basis in maths. In a frequentist interpretation, this is ridge regression. In a Bayesian interpretation, it's the prior.

Briefly, the argument goes that for ridge regression, we penalize large model parameters. So, instead of minimizing our error (θ X - yactual) we minimize our error plus the penalty:

yestimate = (θ X - yactual)2 + λ θT θ

by differentiating with respect to θ. Solve this equation and you'll see a [XT X + λ I]-1.

The argument for the Bayesian prior briefly goes like this: if we take a frequentist view and assume that the error in our data is Gaussian and plug yestimate into it, we'll see our familiar equation for a Gaussian multiplied by eλθTθ. Since the Bayesian posterior,  p(θ|Data,Model) must equal the frequentist probability, eλθTθ is the only term that maps to p(θ|Model) simple because it's the only one with θ in it. Therefore, our conditioning manifests itself in the prior.

Full derivations appear in Jake VanderPlas' wonderful blog.

Friday, November 6, 2020

Scala Proofs - Part 1

According to the Scaladocs,

An instance of A <:< B witnesses that A is a subtype of B.
Requiring an implicit argument of the type A <:< B encodes the generalized constraint A <: B.
So, for instance:

  class MySuper

  class MySub extends MySuper 

  def demandsProof[A](implicit a: A <:< MySuper): Unit = ???

demands proof that any A is a subclass of MySuper. We can ensure that this is satisfied without calling it with an argument:

  demandsProof[MySub] // OK
Rob Norris @tpolecat Apr 02 22:26
A <:< B extends A => B and is available implicitly if A <: B
A =:= B extends A => B and is available implicitly if A is the same type as B
This operator comes in handy when I want to enforce at compile time a rule that says one can only invert square matrices [Wikipedia]. My code looks like this:

@typeclass trait Invert[M[_ <: Length, _ <: Length]] {
  @op("inv") def invert[A <: Length, B <: Length](x: M[A, B])(implicit a: A =:= B): M[A, A]
}

(Note the use of Simulacrum to automatically generate type classes.)

Adam Rosien describes the difference between A <:< B and B >: A at InnerProduct:
There is an analogous implicitly passed value for subtype bounds, but the subtype bound is not syntactic sugar for it. (I thought it was.) That is, the type signature

def widen[A, B >: A](fa: F[A]): F[B] 
is equivalent to the type signature

def widen[A, B](fa: F[A])(implicit ev: A <:< B): F[B] 
but the former is not converted to the latter by the compiler, as is the case for context bounds. [my emphasis]
We can prove this by decompiling the Java bytecode with javap.  The first gives:

  public <A, B> F widen(F);

and the second, context-bound example:

  public <A, B> F widen(F, scala.Predef$$less$colon$less<A, B>);

This highlights a useful equivalence:
Containers of a subtype can be transformed into containers of its supertype, if you can map over the container. The usual defintion of covariance emphasizes subtypes, but the ability to map is a more general, and useful, definition.
Anyway, why would you use one over the other? See the source code for Scala's Option:

sealed trait Option[+A] {
  // def flatten[B, A <: Option[B]]: Option[B] = ...
  // won't work, since the A in flatten shadows the class-scoped A.

  def flatten[B](implicit ev: A <:< Option[B]): Option[B]
    = if(isEmpty) None else ev(get)
  // Because (A <:< Option[B]) <: (A => Option[B]), ev can be called to turn the
  // A from get into an Option[B], and because ev is implicit, that call can be
  // left out and inserted automatically.
}



Friday, October 30, 2020

Odd Characters

The Problem

A data scientist is struggling to get his regex to work in Spark on Databricks. He's wanting to check the data for British postcodes (a.k.a. zipcodes). His regex looks like:

[A-Z]{1,2}[0-9][A-Z0-9]?\\s?[0-9]  

Everybody agrees that any string in the data that matches this regex can reasonably be considered a British postcode. However, he sees in his notebook many instances that look valid but were marked as invalid.

The Investigation

Despite spending most of the last 25 years using the JVM, I've only recently come to appreciate character encodings. It's something that seems annoying and mostly irrelevant. However, last month I wasted two days because the British pound sterling sign (£) can be represented as 0xa3 in what's commonly known as Extended ASCII as well as U+00A3 in Unicode. That means a difference of one byte - which can be the difference between being granted access if it's a password and being locked out if it's not. In this case, the extended ASCII version came from somebody typing on the Linux CLI and the Unicode came from a Java String derived from a properties file. 

Back to our regex. A Scala function seemed to behave differently when it was part of a UDF running on the Spark executor than when it was called locally in the Spark driver. To demonstrate this, the data scientist ran display(hisDataFrame) on the driver and copy-and-pasted the string that should be a British postcode but was not showing up as one into the regex code directly. And it matched! 

What was going on? Cue some false leads and dead ends concerning character encoding (eg, "Unicode grows... char is legacy ... use codePointAt" [SO]; it seems Java hasn't always played fairly with Unicode [SO] even though the code here worked fine on my JDK 1.8). 

But it was this answer on StackOverflow with which I converted the String to hexadecimal. It takes advantage of BigInteger being big-endian when contructed with byte arrays. Converted to Scala it looks like:

def toHex(x: String): String = String.format("0x14x", new java.math.BigInteger(1, x.getBytes))

Now it became clear that there were rogue spaces in the text. So, why had the data scientist's copy-and-paste not worked? Well, Databrick's display function seems to convert the contents of the Dataframe into HTML and multi-spaces in HTML are rendered as just a single space. So, he was pasting the wrong data into his function.

The solution

The solution was to simply add the expectation of mutli-spaces to the regex. But it raised a bigger issue. Although this Databricks display function is great for rendering data in a more pleasing manner, it opens you up to issues like this. Best to stick with DataFrame.show() if you're doing exploratory data analysis and use display for the execs. The former is not subject to this HTML rendering issue.

Wednesday, October 28, 2020

Streaming architecture


Some miscellaneous notes on streaming.

Streaming Semtantics are Difficult

"The assumption 'we receive all messages' is actually challenging for a streaming system (even for a batch processing system, though presumably the problem of late-arriving data is often simply ignored here). You never know whether you actually have received 'all messages' -- because of the possibility of late-arriving data. If you receive a late-arriving message, what do you want to happen? Re-process/re-sort 'all' the messages again (now including the late-arriving message), or ignore the late-arriving message (thus computing incorrect results)? In a sense, any such global ordering achieved by 'let's sort all of them' is either very costly or best effort." [StackOverflow, Michael G. Noll
Technologist, Office of the CTO at Confluent]

Kafka

"Brokers are not interchangeable, and clients will need to communicate directly with the broker that contains the lead replica of each partition they produce to or consume from. You can’t place all brokers behind a single load balancer address. You need a way to route messages to specific brokers." [Confluent Blog]

"But whatever you do, do not try to build an ESB around Kafka—it is an anti-pattern that will create inflexibility and unwanted dependencies." [Kai Waehner, Confluent]

Kafka as a Database

This comes from a talk by Martin Klepperman (Kafka as a Database).

He takes the canonical example used in SQL databases and extends it to Kafka. Let's send two parts of a financial transaction as a single message to Kafka. We then have Kafka separate it into two messages: one to a credit topic, one to a debit topic. We now have atomicity and durability but not isolation. If you think about it, this is the same as ReadCommitted

Note, if you crash half way between these two operations, you will need to dedupe or using Kafka's once-only semantics but you still have durability.

Example 2: imagine two users try to register the same user name. Can we enforce uniqueness? Yes, partition the topic on username and one will be before the other and all consumers will agree. We've achieved serializability (ie, an isolation level) and have lost parallelization.

Kafka Leaders and Followers - Gotcha

Jason Gustafson (Confluent) describes how Kafka logs can be truncated.

Say the leader writes locally some data but then dies before it can replicate that data to all of its ISRs. The remaining ISRs replicas nominate a new leader [via the "controller"] but it's an ISR that did not receive the data. Those ISRs that did receive it now have to remove it as they follow-the-leader but the leader never had it. Their logs are truncated to the high watermark.

Note that clients will not see this data as there is the notion of a watermark that is the highest commit ID across all ISRs. (Slow ISRs can be dropped from the set but this invariant still holds. The dropped ISRs can rejoin later when they have the requesite data).  The leader defines the high watermark. This piggybacks on the response to a fetch request. Consequently, the high watermark on the follower is not necessarily that on the leader.

Committed data is never lost.

Terminology

Reading documentation, references to out-of-bound data kept cropping up so I thought it worth making a note here:

"Out-of-band data is the data transferred through a stream that is independent from the main in-band data stream. An out-of-band data mechanism provides a conceptually independent channel, which allows any data sent via that mechanism to be kept separate from in-band data. 

"But using an out-of-band mechanism, the sending end can send the message to the receiving end out of band. The receiving end will be notified in some fashion of the arrival of out-of-band data, and it can read the out of band data and know that this is a message intended for it from the sending end, independent of the data from the data source." [Wikipedia]


Sunday, October 25, 2020

Covariance and Precision Matrices

My aim is to write an advanced maths book that uses just pictures :) This is my attempt for the concepts of covariance and precision.

Example 1: Student IQs

Let's take a class of 100 students and take 10 measurements for each of them. This measurement data is Gaussian in its distribution. Then, our data looks like this:

Data Heatmap

[Python code lives here].

We can see each student's measurements roughly conforms to a value but there is a large variety over all the students.

Now, we build the covariance matrix where:

  1. the expected value (mean) for a row is subtracted from each value in the vector
  2. the resulting vector is multiplied with all the other vectors and this forms the index for our matrix
  3. each cell is multiplied by the probability of that combination occuring. It's perfectly possible that this is zero in some cases but for our use case the distribution of which student and which measurement we might chose is uniform.

Note, regarding covariance as a measurement:

"Covariance is useful in some computations, but it is seldom reported as a summary statistic because it is hard to interpret. Among other problems, its units are the product of the units of X and Y. So the covariance of weight and height might be in units of kilogram-meters, which doesn’t mean much" - Think Stats

Anyway, our covariance matrix looks like this:

Covariance matrix heatmap

It should be immediately obvious that 
  1. each row has a strong covariance with itself (the diagonal)
  2. that the matrix is symmetric (since the covariance formula is symmetric) 
  3. and that all other cells than the diagonal are just noise.

The inverse of the covariance matrix is called the precision matrix. It looks like this:

Precision matrix heatmap

This represents the conditional probability of one student's measurements on another and not surprisingly, in our particular use case, the matrix is full of zeroes (plus some noise). In other words, the measurements for all students are independent. And why shouldn't they be? Given the measurements for student X, what does that have to do with the measurements for student Y? This might not be true for other datasets as we'll see in the next example. 

Example 2: A Stock Price

To illustrate this, I've written some Python code based on this excellent post from Prof Matthew Stephens. He asks us to imagine a random walk where on each step we add the value from a Gaussian distribution. The output might look like this:

Random walk


Note that just adding Gaussians qualifies as a Markov Chain as the next value only depends on the previous. That is:

X1 = N(σ2, μi)
X2 = X1 + N(σ2, μi)
X3 = X2 + N(σ2, μi
...
Xn = Xn-1 + N(σ2, μi)

We rewrite this so that our random variable, Z, is depicted as a vector of values generated from N(σ2, μi) and note that a random variable is neither random nor a variable in the programming sense of the words. It's values may have been drawn from a random distribution, but once we have them they are fixed as you can see from our Python code. 

Anyway, once we've done this, we can represent the above set of equations as matrices:

X = A Z

where A is just an n x n matrix where the lower left values are 1 and the others are 0 (try it).

The covariance matrix of A tells us how each of the elements are related to each other and looks like this:

Covariance Matrix of the Random Walk


Unsurprisingly, values are similar to their neighbours (the diaganol) and less similar to those further away (bottom left and top right corners). We'd expect that with a random walk.

[Just a note on covariance matrices: each row on n elements is centred on its mean. The resulting matrix is then multiplied by its transpose and divided by n - 1 (the -1 is the Bessel correction that comes from the fact we're sampling rather than using the entire population). But this depends on a certain interpretation of what a row is. If the row is taken from the same distribution, happy days.]

The precision matrix looks like this (zoomed into the top 10 x 10 sub matrix):

Precision Matrix of a Markov Chain

and this gives us an intuitive feeling for what the precision matrix is all about "The key property of the precision matrix is that its zeros tell you about conditional independence. Specifically [its values are] 0 if and only if Xi and Xj are conditionally independent given all other coordinates of X." [here] And this is what we see here. Each point is only dependent on its immediate neighbour - the very definition of a Markov process.

Friday, October 23, 2020

Optimizing Spark/DataBricks in Azure

We have about 1.5TB of parquet data in Gen 2 storage on Azure and we're trying to read it with a DataBricks cluster of 8 nodes each with 56gb and 16 cores. All I'm trying to do is some basic select('aColumn).distinct and where('anotherColumn < X).count()s.

Surprisingly, this is running like a dog: the first query takes about 50 minutes and the second about 18. Tuning is an empirical science but this felt wrong as 1.5TB of data is not that much these days.

Ganglia Metrics

Ganglia showed that the cluster was running at high capacity although the network usage seemed low. If this is showing the data being pulled from Gen2 into Spark workers, at 300mb/s it would take about an hour to pull it all out of storage. Indeed, this is roughly how long it took. 

However, when I took thread dumps, the executor threads were not waiting on IO. Typically, they were in Spark's CompressibleColumnBuilder (which, according to the docs, "builds optionally compressed byte buffer for a column"). So, it seemed that the cluster really was CPU-bound on parsing the files and garbage collection - which was bad but not horrendous:

Spark GC logs

Calling cache() on the data didn't seem to make queries much faster. Looking at the thread dumps again showed the threads spending a lot of time in CompressibleColumnBuilder for the initial select distinct and in java.io.FileInputStream for the subsequent counts. 

Remembering that  persisting with a storage level of MEMORY_AND_DISK_SER had given me a quick win in the past, I thought I'd try it again but with no success.

I tried enabling the Delta Cache. This is supposed to cache the Gen2 data in the cluster:
Storage Tab

Compare this to the storage tab when we're not using Delta Cache and you'll see there won't be any data in the "Parquet IO Cache". (Note that the page says "RDDs" when we're only using the Dataset API. This is standard). Anyway, enabling Delta Cache seemed to make no difference at all.

Using a "Delta Cache Accelerated" cluster actually made the initial distinct.count() take much longer (1.47 hours) with only marginally reduced time for the next query (16.8 minutes).

Disk is incredibly slow compared to memory. Ultimately, I was trying to process too much data with too little RAM and there's only so far tuning will get you before your metrics plateau. So, I made a cluster of 25 nodes each with 28gb of RAM and 8 cores. This did help but not as much as I was expecting. The distinct.count() now took 38 minutes and the where(...).count() 12 minutes. This is between a 25%-33% improvement but still seems a long time in absolute terms.

I want my Data Locality 

The problem appears to be that all the data is pulled to the executors even if we're only interested in just one column [see StackOverflow]. Evidence for this came from another look at Ganglia:

Ganglia Metrics during second query
This was taken after the distinct.count() and during the where(...).count() query. Note that there is no more network activity, suggesting the first query cached the data.

"In Azure really fast networks compensate for having data and compute separated." [ibid] Indeed, the threads seemed to spend all their time on the first call in code to parse the Parquet rather than blocking on IO. But you can't get over the fact that my two queries are doing much the same thing (no joins) but the first takes three times longer than the second. To demonstrate, instead of two different queries, I ran the distinct.count() query twice. The first time took 42 minutes, the second 15. So, it appears that pulling 1.5TB of data from Gen2 into Spark takes about 25 minutes.

Azure is not the only cloud offering that implements this architectural trick. GCP takes all the data out of storage and puts it into an ephemeral HDFS cluster.

Solution

A new day, a new dawn. Let's start the cluster again and go from the beginning. This time, I won't explicitly enable IO cache and I won't even call .cache() on my DataFrame. To my amazement, the same query ran in about 30 seconds today! It seemed that this .cache() call was the root of my problems as re-introducing it returned us to a 50 minute wait. This is pretty much the opposite of what you'd expect with an on-prem Hadoop/YARN cluster.

Tuesday, September 29, 2020

A waste of two days

 This post is just for reference when people ask me what can possibly go wrong with a simple process. In this case, the solution was trivial but the problem was enormous.

Basically, I had some code for which decrpytion was central. I had all my tests run on each check-in that used Bouncy Castle to decrypt data encrypted by gnugpg. This was also tested in lower environments but the upper environments were closed to me. Only DevOps could go there. And, lo-and-behold! decryption failed there with exactly the same artifact as was tested elsewhere.

The error was "checksum mismatch at 0 of 20". A quick Google turned up this StackOverflow post that indicated there was a problem with the pass phrase (a means of protecting the private key). I looked at the file in which the passphrase lived using:

Playing with hexdump [SO] I ran something like:

hexdump -e'"%07.8_ax  " 8/1 "%03d "  " |\n"' passphrase.txt
00000000  049 050 051 -62 -93 052 053 054 |
00000008  055 010                         |

Well, firstly there appears to be a line return (010) in there but this is a read herring as if you use the command to get this into KeyVault as outlined in a previous post (that is, by cating it) then the new line is removed. This was serendipitous as the gpg man pages say:

       --passphrase-file file

              Read the passphrase from file file. Only the first line will be read from file file. 

If we'd used the proper az command line (using the -f switch to indicate a file rather than cating it), Key Vault would have contained the newline. 

So, after wasting some time on that, I next looked at why my JVM was saying the pass phrase was 21 characters when I could only see 20. Checking each byte of the pass phrase, it became clear that the character "£" (that is, the pound sterling sign) was taking up two bytes. 

Fair enough, I thought. It's not ASCII but I can encrypt and decrypt fine with this pass phrase using gpg. So, why was my code failing?

And this is the kick in the pants. "Unless otherwise specified, the character set for text is the UTF-8 [RFC3629] encoding of Unicode [ISO10646]." (OpenGPG format RFC) But there is a UTF-8 enconding for the pound sterling symbol, so what gives? Well, different implementation are at liberty to map characters to UTF-8 as they see fit, it seems:

"Please note that OpenPGP defines text to be in UTF-8.  An implementation will get best results by translating into and out of UTF-8.  However, there are many instances where this is easier said than done.  Also, there are communities of users who have no need for UTF-8 because they are all happy with a character set like ISO Latin-5 or a Japanese character set.  In such instances, an implementation MAY override the UTF-8 default by using this header key.  An implementation MAY implement this key and any translations it cares to; an implementation MAY ignore it and assume all text is UTF-8." [ibid]

So there is some ambiguity how to handle these characters and I don't appear to be the only one to have fallen into this trap as the Bouncy Castle mailing lists suggest. And it's not just Bouncy Castle but Open Keychain also appears to have this problem.

Ambiguous specificationas are the root of all evil.


Saturday, September 26, 2020

Gaussian Processes - part 1

Gaussian Processes is a massive topic and these are just notes I made from learning about them.


The gist

It's all Bayesian.


Information Theory, Inference, and Learning Algorithms (MacKay)

If you squint, this is just Bayes' formula but where we're finding the probability for a formula, y(x) over the targets, t. and the data, X.

We assume we can model this problem by assigning Gaussians to all states.

Now, a multivariate Gaussian (sometimes called MVN for multivariate normal) looks like:

 MVN (ITILA, MacKay)

ITILA says "where x is the mean of the distribution" and that's the case when our expection is the mean. But if we applied this to a linear regression then x wouldn't be our mean but our linear equation (by definition of what is our expectation). Furthermore, why choose linear regression? Let's use a different function altogether! "This is known as basis function expansion" [MLPP, Murphy, p20].

The matrix, A, in the MVN equation above is just the inverse of the covariance matrix, called the precision or concentration matrix. Just as a reminder, covariance is defined as:

cov[X, Y] = E[X - E[X])(Y - E[Y]) = E[XY] - E[X] E[Y]

and so the covariance matrix is defined as:

cov[x] = E[(x - E[x])(x - E[x]T]


Example  

This example is from examples for Figaro. Here, 

y(x) = x2 + N(0, 1) 

where N is the normal distribution, x is an integer in the range (1, 10) and we want to find out if given, say x=7.5, what would y most likely be?

To do this, we introduce a covariance function:

exp -γ * (x1 - x2)2

where γ is just a constant and the xs are pairwise actual values. The choice of function just makes the maths easier but suffice to say it penalizes large differences.

Now, given our covariance function, we can apply it to all pair combinations to make the covariance matrix (represented as a heat map here):
Matrix from the covariance function

then we invert it (after we add some noise to the diaganol to ensure it's not ill-conditioned):

Inverted matrix of the covariance function

then element-wise multiply it with the observed y(x) values (ie, all those corresponding to x in the range 1 to 10):
Alpha

This completes the training process.

Now, all the Figaro example does is plug these tensors into pre-packaged formulas for the expected value (μ*) and covariance matrix (Σ*) of the derived Gaussian. These can be found in Kevin Murphy's excellent "Machine Learning: a Probabilistic Perspective", equations 15.8 and 15.9: 

μ* = μ(X*) + K*TK-1(f - μ(X))
Σ* = K** - K*TK-1K* 

where

X is the observed data
f is the mapping from the known inputs (X) to outputs
X* is the data for which we wish to predict f 
K is the observed data that has gone through the kernel function/covariance matrix and is a function of  (XX)
K* is a mix of the data X and X* that has gone through the kernel function/covariance matrix and is a function of  (XX*)
K** is the data X* that has gone through the kernel function/covariance matrix and is a function of  (X*X*)

The derivation of these equations is beyond me, as is why the Figaro demo uses 1000 iterations when I get similar results with 1 or 10 000.

Next time. In the meantime, it appears that Murphy's book is going to be available online in a year's time and the code is already being translated to Python and can be found at GitHub here.


Further reading 

Gaussian Processes for Dummies [Kat Bailey's Blog]
The Model Complexity Myth [Jake VanderPlas's Blog]

Tuesday, September 22, 2020

Spark Gotchas


Some miscellaneous notes on Spark I've been making the last few weeks.


Triggers and OutputModes when Aggregating

I was experimenting with aggregating a query that reads from Kafka and writes Parquet to HDFS.

The interplay between watermarking, Trigger and OutputMode look a bit like this:
  • Queries that aggregate with OutputMode.Append yet with no watermark will always throw "Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;" irrespective of the Trigger
  • Queries that aggregate with OutputMode.Complete  with "Data source parquet does not support Complete output mode;". Even text files will barf with "Data source text does not support Complete output mode;".
  • Queries that aggregatie with OutputMode.Append but with a watermark behave like this:

TriggerBehaviour
ContinuousUnknown type of trigger: ContinuousTrigger(5000) [1] 

or

Continuous processing does not support EventTimeWatermark operations.;;
ProcessingTimeNo errors but last batch is not pulled from Kafka
none ("When no trigger is specified, Structured Streamin stats the processing of a new batch as soon as the previous one is finished." - Stream Processing with Apache Spark)No errors but last batch is not pulled from Kafka
OnceNo errors but nothing is pulled from Kafka.
Running the query twice with awaitTermination pulls some elements. 

The reason Spark leaves a little something on Kakfa is (apparently) for efficiency. There appear to be a pull request for making Spark consume the last bit from Kafka but in its associated Jira somebody mentions my StackOverflow question that talks about Spark's behaviour being unchanged.

So perplexed was I that I asked the Spark mailing lists. The answer from Spark committer, Jungtaek Lim, was illuminating:

"Unfortunately there's no mechanism on SSS to move forward only watermark without actual input, so if you want to test some behavior on OutputMode.Append you would need to add a dummy record to move watermark forward."

This doesn't seem to be because we're writing Parquet. Text files will also behave this way.


Defragging Question

Spark will create a new file per executor thread per ProcessTrigger time (as this test code demonstrates). This can lead to a lot of files that could ultimately choke your Name Node if you're persisting to HDFS. This may necessitate a roll-your-own fix [SO].

If you're using DataBricks, you can do this "bin-packing" by running the OPTIMIZE command. Unfortunately, I could not see the code to do this inside the Delta library DataBricks have open sourced. (I discuss other ways of dealing with this problem without Delta in this blog post).

Since our use case at work is such that we only want the output of stream processing once a day, we opted for Trigger.Once. Since we're not aggregating, all of the data is pulled and processed. Since this is streaming in a very loose sense of the word (it's more batch processing that gets its data in one short, sharp stream), we avoided having large numbers of small files.


Exceptions in processing DataFrames

Spark can optionally have a "best efforts" approach to exceptions rather than failing the whole process.

spark.range(10000).repartition(7).map { i =>
  if (i == 9999) { Thread.sleep(5000); throw new RuntimeException("oops!") }
  else i
}.write.option("mapreduce.fileoutputcommitter.algorithm.version", ALGO_VERSION).mode("append").parquet(s"/tmp/test_algo_$ALGO_VERSION")

When ALGO_VERSION=1, nothing is written to HDFS. On the other hand, when ALGO_VERSION=2, the partition in which the exception occurred is not written but all the other ones are.

see https://databricks.com/blog/2017/05/31/transactional-writes-cloud-storage.html


DataFrame types

You can see the schema as JSON with:

spark.read.parquet("THE_FILE").schema.json

The type "timestamp" would map the data to a java.sql.Timestamp (the equivalent of a SQL TIMESTAMP type). This Java type is a subclass of java.util.Date.

The type "date" would map to a java.sql.Date (the equivalent of a SQL DATE type). This is also a subclass of java.util.Date.

See this StackOverflow page  for descriptions of the SQL types.

Trying to read from a Parquet file that uses "date" and coercing it to a "timestamp" will result in:

Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary


Idempotency

DataStreamWriter.foreachBatch is experimental and "is supported only the in the micro-batch execution modes (that is, when the trigger is not continuous)". It provides a batchId that "can be used deduplicate and transactionally write the output (that is, the provided Dataset) to external systems. The output Dataset is guaranteed to exactly same for the same batchId (assuming all operations are deterministic in the query" [Spark docs].


[1] "You can set the optional Continuous Trigger in queries that satisfy the following conditions: Read from supported sources like Kafka and write to supported sinks like Kafka, memory, console." SO quoting DataBricks.

Thursday, August 27, 2020

Azure

The Azure web GUI is quite immature. For instance, if you install the Azure Storage Explorer (Windows only) it doesn't show timestamps of files. Fortunately, a lot (everything?) can be done from the command line. This, for instance, mounts a SMB drive in the cloud on my local Linux box where it can be treated as any other directory:

sudo mount -t cifs //XXX.file.core.windows.net/DIRECTORY /mnt/DIRECTORY -o vers=3.0,username=USERNAME,password=GET_THIS_FROM_THE_WEB_GUI,dir_mode=0777,file_mode=0777,serverino 

Also, if you want to put a multi-line value into Microsoft's Key Vault, you'll find you can't do it in the web GUI. You need to put the text with line returns into YOUR_FILE and use:

az keyvault secret set --name YOUR_KEY --vault-name VAULT_NAME --value "`cat YOUR_FILE`"


Docker and K8s in the Azure cloud

First, tag your image with something like:

docker tag 8d2be7e5d4eb XXX.azurecr.io/YYY:1.0

where XXX is your image repository subdomain in Azure and YYY is the name of the artifact. Login with:

az acr login -n XXX

and now you can push your artifact into the Azure infrastructure:

docker push XXX.azurecr.io/YYY

(You might need to run az acr login -n XXX first) 

Let's check it works:

kubectl run -i --tty --attach ARBITRARY_NAME --image=XXX.azurecr.io/YYY:1.0  --command -- /bin/bash

and behold, we are on the CLI of a remote container in the Azure cloud.

But don't forget to clean up after ourselves with:

kubectl delete deployment ARBITRARY_NAME


Network Speeds

By having your image pushed to K8s, you can run your code in Azure as easily as your laptop. The big benefit is network speeds. In my case, I was decrypting an RSA encoded file taken from BLOB storage at about 1mb/s on my (well specced) laptop but exactly the same code was easily managing 10mb/s in the Azure cloud. (Yes, I know that using asymmetric ciphers for large files is not efficient [SO] but this was imposed on us by our client). By using jstack, I could see that the threads on my laptop were spending most of their time in IO not Bouncy Castle.


Thursday, August 20, 2020

Self-documenting tests

Even though it's 2020, self-documenting tests are still niche. There is Clairvoyance, a Scala flavour of YatSpec (that I mention exensively in this article for IBM). Its creator, Rhys Keepence is an old colleague and told me recently that it "is mostly up to date (although I haven’t yet published for Scala 2.13). The docs on my github page are not super up to date, but the latest version is 1.0.129".

However, introducing yet another new library to the codebase was too much an ask so I started using GivenWhenThen in ScalaTest. It was somewhat painful to get it to print just the Given, When, Then outputs to a separate file that can be version controlled for the edification of other data scientists modulo all the gubbins that are also spat out in your typical build.

I eventually did it using these resources from the ScalaTest docs. The top-and-bottom of it is that the GWT outputs are captured in a Reporter that may be bespoke (IntelliJ uses its own to seperate the GWTs from the logging). This Reporter can then spew out the events at the end of the test. But if you want them in a file produced by your build, you'll need something like this (in Maven):

    <build>
        <plugins>
            <plugin>
                <groupId>org.scalatest</groupId>
                <artifactId>scalatest-maven-plugin</artifactId>
                <version>2.0.0</version>
                <configuration>
                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
                    <junitxml>.</junitxml>
                    <stderr/>
                    <filereports>W ../docs/src/main/acceptance_tests/scenarios.txt</filereports>
                </configuration>
...

and then copy scenarios.txt where it can be versioned controlled with something like:

            <plugin>
                <artifactId>maven-resources-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <id>copy-resources</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${basedir}/../docs/src/main/acceptance_tests/</outputDirectory>
                            <resources>
                                <resource>
                                    <directory>target/docs/src/main/acceptance_tests/</directory>
                                    <filtering>true</filtering>
...

The W in filereports means without colour since although this looks good on a Unix CLI, it just adds odd escape characters to a text file, which is what the data scientists want.

I'm unaware of a similar BDD framework for ZIO which means I need to mix my ZIO tests with ScalaTest. Unfortunately, I noticed that with Maven, some ZIO tests were failing but this did not stop the build. I documented this on ZIO's github here.


Friday, August 14, 2020

Encryption


A few random notes I've been making about security libraries I've been using this past year or so.

How Random is Random?

SecureRandom is the gold standard. However, "depending on the implementation, the generateSeed and nextBytes methods may block as entropy is being gathered, for example, if they need to read from /dev/random on various Unix-like operating systems." [JavaDocs] This hasn't been a problem for me so far as I create one million 64-bit random numbers in my unit tests and the whole process takes about a second or two.

On Linux, you can see the temperature of the CPU, fan speeds etc by installing the tools mentioned here (AskUbuntu). This is one way to generate randomness.

There's an interesting addition to the Java API called ThreadLocalRandom that is more efficient than java.util.Random but still not appropriate for secure random number generators.


PGP or GPG?

"OpenPGP is the IETF-approved standard that defines encryption technology that uses processes that are interoperable with PGP. pgp is Symantec's proprietary encryption solution. pgp adheres to the OpenPGP standard and provides an interface that allows users to easily encrypt their files." [NetworkWorld]

"gpg is the OpenPGP part of the GNU Privacy Guard (GnuPG). It is a tool to provide digital encryption and signing services using the OpenPGP standard. gpg features complete key management and all the bells and whistles you would expect from a full OpenPGP implementation." [gpg man pages].

You can have the public key embedded in the file which can identify the recipient.  Why this is useful? "As far as I know, the recipient's public key IDs, key Validity dates, name, and email address are embedded in the GPG ASCII Armor file (GnuPG Manual ). So using pub key file / Key ID / Name / Email to identify which public key to use should all be equivalent." [StackExchange]

Importing a private key

If you haven't got the key, you can't decrypt a file. But if you have, you don't need to specify it. For instance, if I try to decrypt a file for which I don't have a key, I see:

$ gpg --output file.zip -d file.zip.pgp 
gpg: encrypted with RSA key, ID EAC258F9825D4C9C
gpg: decryption failed: No secret key

However, I can import it:

$ gpg --import ~/Temp/key.txt
gpg: key EAC258F9825D4C9C: public key "XXXX-TEST " imported
gpg: key EAC258F9825D4C9C: secret key imported
gpg: Total number processed: 1
gpg:               imported: 1
gpg:       secret keys read: 1
gpg:   secret keys imported: 1

and now decrypt it:

$ gpg --output file.zip -d file.zip.pgp 
gpg: encrypted with 4096-bit RSA key, ID EAC258F9825D4C9C, created 2020-02-27
      "XXXX-TEST "

Bouncy Castle

Bouncy Castle is the defacto library to allow the JVM to access OpenPGP files. 

One gotcha I found when using Bouncy Castle in an über JAR that was called in a Docker container was:

Caused by: java.util.jar.JarException: file:/home/henryp/main-1.0-SNAPSHOT-jar-with-dependencies.jar has unsigned entries ...

There doesn't seem to be a huge amount you can do about this if you insist on using über JARs as "You can't bundle a cryptographic library. They have to be signed for the JVM to load them, and the signature is destroyed when merged into the shadow jar." [GitHub]

This seems to be something specific to Oracle's JDK because if my Docker config file starts with:

FROM openjdk:11-jdk-slim

I don't have this problem. 


Encrypted ZIPs

I was hoping to stream a zip file that was encrypted, decrypting and unzipping as I went but was worried about the ZIP format. Note that a "directory is placed at the end of a ZIP file. This identifies what files are in the ZIP and identifies where in the ZIP that file is located. This allows ZIP readers to load the list of files without reading the entire ZIP archive. ZIP archives can also include extra data that is not related to the ZIP archive." [Wikipedia

So, could I really decrypt and unzip a stream?

Changing to GZIP wouldn't help either because "Both zip and gzip use the same compressing format internally, the main difference is in the metadata: zip has it at the end of the file, gzip at the beginning (and gzip only supports one enclosed file easily)." [StackOverflow]

But decrypting the stream and forking a process that unzips it using PipedInputStream and PipedOutputStream seems to work even on files of a about 1gb.

Encrypted Parquet

Parquet Modular Encryption allows certain columns to be encrypted.

OAuth

“Designed specifically to work with … (HTTP), OAuth essentially allows access tokens to be issued to third-party clients by an authorization server, with the approval of the resource owner” [Wikipedia] "A trust store is used to authenticate peers. A key store is used to authenticate yourself." [StackOverflow]

You can get tokens in Google DataFlow with this:

    val credentials = ComputeEngineCredentials.create()
    val accessToken = credentials.refreshAccessToken()
    logger.info(s"accessToken = $accessToken")             // OAuth token
    logger.info(s"getAccount = ${credentials.getAccount}") // service account name

Although almost ubiquitous, OAuth has its drawbacks:

Ross A. Baker @rossabaker Jun 08 04:27
I think the specification is far too complex for what it accomplishes.
I had a lengthy argument implementing it when I worked at a security company as a replacement for a request signing algorithm.
In request signing, you can't just steal a token the way you can in OAuth2.
And the argument is, "Well, it's sent over TLS, what does it matter?"
And as we were having that argument, those tokens were appearing in clear text in our logs.
Was it a shitty implementation? Absolutely. But all it takes is one mistake like that.
It's neither as convenient as basic auth, nor as secure as something like an HMAC-signed request. I feel like it operates in a middle ground that suits no purpose very well.

Gavin Bisesi @Daenyth Jun 08 14:48
another pain point is that IIUC the oauth spec is very full of "MAY" options and relatively few "MUST" options, so every actual implementation does stuff differently and nothing is compatible with anything else eg you often need specific logic to support X vs Y backends

An alternative to OAuth is Request Signing. Basically, the server has all private keys, clients only have their own and messges are encrypted and signed - see Andrew Hoang's blog.


Gotchas

When storing pass phrases etc in files, be careful that your editor does not add a newline. For instance, open a file in vi such:

$ vi /tmp/5Chars.txt

Type the string 12345, save and close it.

$ ls -l /tmp/5Chars.txt
-rw-r--r-- 1 henryp kismet 6 Nov 24 10:23 /tmp/5Chars.txt

What? it's 6 bytes, not 5! One solution is this:

$ echo -n 12345 > /tmp/5Chars.txt 
$ ls -l /tmp/5Chars.txt
-rw-r--r-- 1 henryp kismet 5 Nov 24 10:25 /tmp/5Chars.txt

That's better.