Wednesday, December 9, 2020

DR in an Azure Cloud Ecosystem

In our topology, we have a Databricks Spark Structured Streaming job reading from an HDInsights Kafka cluster that is VNet injected into our subscription. So, disaster recovery has to take into account two things:

  1. the Kafka cluster
  2. the blob storage where the data is landed by SSS.
Kafka

This Microsoft document outlines the usual way in securing your data in Kafka (high replication factors for disk writes; high insync replicas for in-memory writes; high acknowledgement factor etc). In addition, an HDInsight cluster are backed by managed disks that provide "three replicas of your data" each witihin their own availability zone that's "equipped with independent power, cooling, and networking" [Azure docs].

So, within a region, things look peachy. Now, how do we get these Kafka messages replicating across region? The HDInisghts documentation suggests using Apache MirrorMaker but note one critical thing it says:

"Mirroring should not be considered as a means to achieve fault-tolerance. The offset to items within a topic are different between the primary and secondary clusters, so clients cannot use the two interchangeably."

This is worrying. Indeed, there is a KIP to make MirrorMaker 2 fix this problem and others like it (like differences in partitions within topics of the same name; messages entering infinite loops etc). Confluent is pushing its Replicator that (it claims) is a more complete solution (there's a Docker trial for it here). And, there is Brooklin, but Azure says this would be self-managed.

Spark and Blobs

At the moment, all the data goes into one region. The business is aware that in the event of, say, a terrorist attack of the data centres, data will be lost. But even the infrastructure guys have the data being replicated from one region to another, note this caveat in the Azure documentation (emphasis mine):

"Because data is written asynchronously from the primary region to the secondary region, there is always a delay before a write to the primary region is copied to the secondary region. If the primary region becomes unavailable, the most recent writes may not yet have been copied to the secondary region."

But let's say that nothing so dramatic happens. Let's assume our data is there once we get our region back on its feet. In the meantime, what has been landed by SSS in the backup region is incompatible with what came before. This is because Spark stores its Kafka offsets in a folder in Gen2. It's just as well Spark is not writing to the directory that the erstwhile live region was using. If we had been writing to a directory that was common to both regions, some finagling would have to be done as we points the Spark job at another directory, effecting the RTO if not RPO.

Aside: A disaster of a different kind

In the old days of on-prem Hadoop clusters, you might see a problem where too many files were created. The consequence would be the Name Node goes down. Sometimes deciphering the Azure documentation is hard but this link  says the "Maximum number of blob containers, blobs, file shares, tables, queues, entities, or messages per storage account" has "No limit". 

Hopefully (caveat: I have not tested this in the wild) this problem has gone away.

Tuesday, December 8, 2020

Cache vs Delta Cache

Continuing the investigation into a lack of cache coherency in Databricks in my previous post,  I've raised the issue with the people at Azure.

In the document a representative from Microsoft pointed me to, there are two caches at play: Delta Cache and Apache Spark cache. "The Delta cache accelerates data reads by creating copies of remote files in nodes’ local storage" and is enabled by default on certain clusters. In effect, you get it for free when you use Databricks 

The Apache Spark cache is familiar one that is invoked by calling Dataset.cache().

My first complaint is that these two caches behave inconsistently. In the Delta Cache, updates are immediately available. But if we use the Apache Spark cache in conjunction with the Databrick's Delta Lake (not to be confused with the orthogonal Delta Cache), the data is frozen in time.

Now, in the Spark world, Datasets are strictly immutable. But Delta Lake "provides ACID transactions ... on top of your existing data lake and is fully compatible with Apache Spark APIs." Well, once I've invoked .cache(), and update the Delta Lake in the standard way:

    df.write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", CONDITION_STRING)
      .save(HDFS_DIRECTORY)

(working example is here) then I cannot see my own updates!

What is, perhaps, even more bizarre can be seen when I try to read the data afresh. A second, totally independent call (but using the same Spark session) to read the data like this:

session.read.format("delta").load(dir)

cannot see the data either! And just to show you that there is nothing up my sleeve, running a completely separate process (code here) to read the data shows that the change has indeed been persisted to HDFS. This appears to be because Spark caches the data plus the Delta Lake meta data too and cares not for Databricks' new semantics.

This brings me to my next complaint - the leaky abstraction. Databricks is trying to retrofit ACID transactions on an API that was built with immutability at its core. The claim that it's "fully compatible with Apache Spark APIs" seems not enitrely true.

I have a meeting scheduled with the MS representative who is currently of the opinion that we should just never call .cache(). On Azure Databrics, this does not seem too bad as the Delta Cache seems pretty fast. It sucks for anybody using Delta Lake on just HDFS.

Summary

If you call .cache() on a Spark Dataset while using Databricks Delta Lake format you will not be able to:

  • see updates from any other Spark session
  • see your own updates to this data
  • see the true data even if you re-read it from the underlying data store
unlesss you unpersist the Dataset.

[Addendum: I spoke to Databricks engineer, Sandeep Katta, on 17 December 2020 and he agreed the documentation is misleading. He says he'll generate a docs PR and submit it to the relevant team to make it clear that one should not use .cache when using Delta Lake]