Gotcha!
What surprised me with the Azure/DataBricks/Spark architecture was a lack of cache coherence. Data is pulled from Azure's Gen2 and is stored closer to the Spark executors upon calling cache(). However, if another cluster updates that data, don't expect the first cluster to see it.
In my case, I was using DataBrick's Delta format to overwrite data as outlined here in the Delta Lake docs. But first, a business requirement demanded that if we're overwriting anything, an override flag must be set. However, because of a lack of cache coherence, this check indicated that there was nothing to overwrite even if another cluster had written that partition. This caused my code to overwrite what was there irrespective of the flag! This significantly changes the semantics of my Big Data application.
Furthermore, any writes after cache() has been called are not reflected in the original Gen2 storage in Azure...
"The Delta cache automatically detects when data files are created or deleted and updates its content accordingly. You can write, modify, and delete table data with no need to explicitly invalidate cached data." [DataBricks Delta Cache docs] This doesn't appear to be the case, at least if it's referring to the original file(s).
"Multiple writers across multiple clusters can simultaneously modify a table partition and see a consistent snapshot view of the table and there will be a serial order for these writes" [DataBrick Delta docs] Again, this might be true but not after cache() has been called, it appears.
The only interesting piece of architecture is that the Azure Gen2 storage was mounted using:
dbutils.fs.mount(
source = s"wasbs://${dst}@${account}.blob.core.windows.net/",
mountPoint = x,
extraConfigs = Map(s"fs.azure.account.key.$account.blob.core.windows.net" -> dbutils.secrets.get(scope = "discope", key = key))
Investigation continue but a friend at a completely different company that uses Databricks confirmed it's not me to whom this is happening. He too tried to replace a partition with a .option("replaceWhere=...") and although it appeared to work "locally" it did not change the underlying file store changed the underlying data store but the problem is upon further reading. The Delta log has been cached also and is now stale. You need to run spark.catalog.clearCache and then reads are OK.
No comments:
Post a Comment