Tuesday, September 29, 2020

A waste of two days

 This post is just for reference when people ask me what can possibly go wrong with a simple process. In this case, the solution was trivial but the problem was enormous.

Basically, I had some code for which decrpytion was central. I had all my tests run on each check-in that used Bouncy Castle to decrypt data encrypted by gnugpg. This was also tested in lower environments but the upper environments were closed to me. Only DevOps could go there. And, lo-and-behold! decryption failed there with exactly the same artifact as was tested elsewhere.

The error was "checksum mismatch at 0 of 20". A quick Google turned up this StackOverflow post that indicated there was a problem with the pass phrase (a means of protecting the private key). I looked at the file in which the passphrase lived using:

Playing with hexdump [SO] I ran something like:

hexdump -e'"%07.8_ax  " 8/1 "%03d "  " |\n"' passphrase.txt
00000000  049 050 051 -62 -93 052 053 054 |
00000008  055 010                         |

Well, firstly there appears to be a line return (010) in there but this is a read herring as if you use the command to get this into KeyVault as outlined in a previous post (that is, by cating it) then the new line is removed. This was serendipitous as the gpg man pages say:

       --passphrase-file file

              Read the passphrase from file file. Only the first line will be read from file file. 

If we'd used the proper az command line (using the -f switch to indicate a file rather than cating it), Key Vault would have contained the newline. 

So, after wasting some time on that, I next looked at why my JVM was saying the pass phrase was 21 characters when I could only see 20. Checking each byte of the pass phrase, it became clear that the character "£" (that is, the pound sterling sign) was taking up two bytes. 

Fair enough, I thought. It's not ASCII but I can encrypt and decrypt fine with this pass phrase using gpg. So, why was my code failing?

And this is the kick in the pants. "Unless otherwise specified, the character set for text is the UTF-8 [RFC3629] encoding of Unicode [ISO10646]." (OpenGPG format RFC) But there is a UTF-8 enconding for the pound sterling symbol, so what gives? Well, different implementation are at liberty to map characters to UTF-8 as they see fit, it seems:

"Please note that OpenPGP defines text to be in UTF-8.  An implementation will get best results by translating into and out of UTF-8.  However, there are many instances where this is easier said than done.  Also, there are communities of users who have no need for UTF-8 because they are all happy with a character set like ISO Latin-5 or a Japanese character set.  In such instances, an implementation MAY override the UTF-8 default by using this header key.  An implementation MAY implement this key and any translations it cares to; an implementation MAY ignore it and assume all text is UTF-8." [ibid]

So there is some ambiguity how to handle these characters and I don't appear to be the only one to have fallen into this trap as the Bouncy Castle mailing lists suggest. And it's not just Bouncy Castle but Open Keychain also appears to have this problem.

Ambiguous specificationas are the root of all evil.


Saturday, September 26, 2020

Gaussian Processes - part 1

Gaussian Processes is a massive topic and these are just notes I made from learning about them.


The gist

It's all Bayesian.


Information Theory, Inference, and Learning Algorithms (MacKay)

If you squint, this is just Bayes' formula but where we're finding the probability for a formula, y(x) over the targets, t. and the data, X.

We assume we can model this problem by assigning Gaussians to all states.

Now, a multivariate Gaussian (sometimes called MVN for multivariate normal) looks like:

 MVN (ITILA, MacKay)

ITILA says "where x is the mean of the distribution" and that's the case when our expection is the mean. But if we applied this to a linear regression then x wouldn't be our mean but our linear equation (by definition of what is our expectation). Furthermore, why choose linear regression? Let's use a different function altogether! "This is known as basis function expansion" [MLPP, Murphy, p20].

The matrix, A, in the MVN equation above is just the inverse of the covariance matrix, called the precision or concentration matrix. Just as a reminder, covariance is defined as:

cov[X, Y] = E[X - E[X])(Y - E[Y]) = E[XY] - E[X] E[Y]

and so the covariance matrix is defined as:

cov[x] = E[(x - E[x])(x - E[x]T]


Example  

This example is from examples for Figaro. Here, 

y(x) = x2 + N(0, 1) 

where N is the normal distribution, x is an integer in the range (1, 10) and we want to find out if given, say x=7.5, what would y most likely be?

To do this, we introduce a covariance function:

exp -γ * (x1 - x2)2

where γ is just a constant and the xs are pairwise actual values. The choice of function just makes the maths easier but suffice to say it penalizes large differences.

Now, given our covariance function, we can apply it to all pair combinations to make the covariance matrix (represented as a heat map here):
Matrix from the covariance function

then we invert it (after we add some noise to the diaganol to ensure it's not ill-conditioned):

Inverted matrix of the covariance function

then element-wise multiply it with the observed y(x) values (ie, all those corresponding to x in the range 1 to 10):
Alpha

This completes the training process.

Now, all the Figaro example does is plug these tensors into pre-packaged formulas for the expected value (μ*) and covariance matrix (Σ*) of the derived Gaussian. These can be found in Kevin Murphy's excellent "Machine Learning: a Probabilistic Perspective", equations 15.8 and 15.9: 

μ* = μ(X*) + K*TK-1(f - μ(X))
Σ* = K** - K*TK-1K* 

where

X is the observed data
f is the mapping from the known inputs (X) to outputs
X* is the data for which we wish to predict f 
K is the observed data that has gone through the kernel function/covariance matrix and is a function of  (XX)
K* is a mix of the data X and X* that has gone through the kernel function/covariance matrix and is a function of  (XX*)
K** is the data X* that has gone through the kernel function/covariance matrix and is a function of  (X*X*)

The derivation of these equations is beyond me, as is why the Figaro demo uses 1000 iterations when I get similar results with 1 or 10 000.

Next time. In the meantime, it appears that Murphy's book is going to be available online in a year's time and the code is already being translated to Python and can be found at GitHub here.


Further reading 

Gaussian Processes for Dummies [Kat Bailey's Blog]
The Model Complexity Myth [Jake VanderPlas's Blog]

Tuesday, September 22, 2020

Spark Gotchas


Some miscellaneous notes on Spark I've been making the last few weeks.


Triggers and OutputModes when Aggregating

I was experimenting with aggregating a query that reads from Kafka and writes Parquet to HDFS.

The interplay between watermarking, Trigger and OutputMode look a bit like this:
  • Queries that aggregate with OutputMode.Append yet with no watermark will always throw "Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;" irrespective of the Trigger
  • Queries that aggregate with OutputMode.Complete  with "Data source parquet does not support Complete output mode;". Even text files will barf with "Data source text does not support Complete output mode;".
  • Queries that aggregatie with OutputMode.Append but with a watermark behave like this:

TriggerBehaviour
ContinuousUnknown type of trigger: ContinuousTrigger(5000) [1] 

or

Continuous processing does not support EventTimeWatermark operations.;;
ProcessingTimeNo errors but last batch is not pulled from Kafka
none ("When no trigger is specified, Structured Streamin stats the processing of a new batch as soon as the previous one is finished." - Stream Processing with Apache Spark)No errors but last batch is not pulled from Kafka
OnceNo errors but nothing is pulled from Kafka.
Running the query twice with awaitTermination pulls some elements. 

The reason Spark leaves a little something on Kakfa is (apparently) for efficiency. There appear to be a pull request for making Spark consume the last bit from Kafka but in its associated Jira somebody mentions my StackOverflow question that talks about Spark's behaviour being unchanged.

So perplexed was I that I asked the Spark mailing lists. The answer from Spark committer, Jungtaek Lim, was illuminating:

"Unfortunately there's no mechanism on SSS to move forward only watermark without actual input, so if you want to test some behavior on OutputMode.Append you would need to add a dummy record to move watermark forward."

This doesn't seem to be because we're writing Parquet. Text files will also behave this way.


Defragging Question

Spark will create a new file per executor thread per ProcessTrigger time (as this test code demonstrates). This can lead to a lot of files that could ultimately choke your Name Node if you're persisting to HDFS. This may necessitate a roll-your-own fix [SO].

If you're using DataBricks, you can do this "bin-packing" by running the OPTIMIZE command. Unfortunately, I could not see the code to do this inside the Delta library DataBricks have open sourced. (I discuss other ways of dealing with this problem without Delta in this blog post).

Since our use case at work is such that we only want the output of stream processing once a day, we opted for Trigger.Once. Since we're not aggregating, all of the data is pulled and processed. Since this is streaming in a very loose sense of the word (it's more batch processing that gets its data in one short, sharp stream), we avoided having large numbers of small files.


Exceptions in processing DataFrames

Spark can optionally have a "best efforts" approach to exceptions rather than failing the whole process.

spark.range(10000).repartition(7).map { i =>
  if (i == 9999) { Thread.sleep(5000); throw new RuntimeException("oops!") }
  else i
}.write.option("mapreduce.fileoutputcommitter.algorithm.version", ALGO_VERSION).mode("append").parquet(s"/tmp/test_algo_$ALGO_VERSION")

When ALGO_VERSION=1, nothing is written to HDFS. On the other hand, when ALGO_VERSION=2, the partition in which the exception occurred is not written but all the other ones are.

see https://databricks.com/blog/2017/05/31/transactional-writes-cloud-storage.html


DataFrame types

You can see the schema as JSON with:

spark.read.parquet("THE_FILE").schema.json

The type "timestamp" would map the data to a java.sql.Timestamp (the equivalent of a SQL TIMESTAMP type). This Java type is a subclass of java.util.Date.

The type "date" would map to a java.sql.Date (the equivalent of a SQL DATE type). This is also a subclass of java.util.Date.

See this StackOverflow page  for descriptions of the SQL types.

Trying to read from a Parquet file that uses "date" and coercing it to a "timestamp" will result in:

Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary


Idempotency

DataStreamWriter.foreachBatch is experimental and "is supported only the in the micro-batch execution modes (that is, when the trigger is not continuous)". It provides a batchId that "can be used deduplicate and transactionally write the output (that is, the provided Dataset) to external systems. The output Dataset is guaranteed to exactly same for the same batchId (assuming all operations are deterministic in the query" [Spark docs].


[1] "You can set the optional Continuous Trigger in queries that satisfy the following conditions: Read from supported sources like Kafka and write to supported sinks like Kafka, memory, console." SO quoting DataBricks.