Friday, September 27, 2024

Running Iceberg Catalogs in a test suite


When trying to put some BDDs together here (GitHub) for Iceberg and Spark integration, I hit some snags. I was using the local catalog. If I used the Hive catalog, I was getting this error ("Iceberg does not work with Spark's default hive metastore"). 

The reason is that Derby, which is bundled with the Hive metastore in Spark, "doesn't support the interfaces (or concurrent access) required for Iceberg to work" according to Russell Spitzer. He suggests using the Hadoop catalog instead. 

I didn't fancy setting up a more robust Hive metastore that uses a more "professional" database like Postgres so I  followed Russel's advice to use the Hadoop catalog... and immediately hit this problem that's discussed more in Slack. Basically, CALLing via Spark SQL some procedures throws an IllegalArgumentException stating "Cannot use non-v1 table". This is a Spark v1 table, and nothing to do with Iceberg v1 tables, it seems.

OK, so this got me thinking that if an easy way to use Hive and Hadoop catalogs don't work, let's try a REST catalog. Apache Polaris seemed a good candidate. At first, I was hoping to have it run in-process but because of transitive dependencies, it wasn't possible to have everything running easily in the same JVM.

Hmm, so we need it in a separate process. OK, we can Docker-ise that. After adjusting the fixtures, I noticed that while running an erstwhile passing test for a second time leads to a failure. Looking at the stack traces, it appears that sure, we DROP TABLE IF EXISTS ..., but the underlying files are left behind. Russell Spitzer helped me out (again) by pointing out that a PURGE keyword needs to be added to the SQL.

Does the catalog also need to delete metadata files even if the query engine deletes the data files?
Russell Spitzer
This isn't actually defined anywhere. So at the moment we are kind of in a wild west, the Spark implementation deletes absolutely everything. I believe for Trino it just sends the request to the catalog and it's up to the catalog to decide. So for Spark "no", and it explicitly sends a drop table request to the rest catalog without a purge flag.
Russell says Polaris will delete files with this flag [caveat] although there is debate about making this service asynchronous ("I think polaris could own this service too but I know some folks (like netflix) have delete services already"), and "the option to hook into an external delete service".

So, there is some latitude in how one implements a catalog, it seems. This would explain why some of my tests are failing - some assumptions I made are not guaranteed. Still working on making them pass.

Thursday, September 26, 2024

Azure Automation

It's surprising how many silly little jobs can be automated away for big productivity wins. I wrote some Python code to run on a laptop that plays with Azure cloud. The main takeaway points were:

The best way to get access is by using azure.identity.InteractiveBrowserCredential in the azure-identity package. This opens the browser and prompts the user to login. This can then be used to instantiate a azure.storage.filedatalake.DataLakeServiceClient in azure-storage-file-datalake. With this we can upload files.

Then, to kick off ADF jobs, we can use the same credential and pass it to azure.mgmt.datafactory.DataFactoryManagementClient in azure-mgmt-datafactory. As a nice little aside, you can use webbrowser in the web-browser package that will open your browser to the ADF job by crafting a URL with the job ID so you can monitor its progress.

This all seemed pretty straight forward but accessing an Azure SQL instance was somewhat harder. To access the credential, I referred to this SO answer. Here I used the InteractiveBrowserCredential as before but then I needed:

    token = credential.get_token("https://database.windows.net/.default")
    connection_string = f"""
        Driver={Driver};
        Server={server};
        Database={database};
        Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;
    """
    token_bytes = token.token.encode("UTF-16-LE")
    token_struct = struct.pack(f'<I{len(token_bytes)}s', len(token_bytes), token_bytes)
    conn = pyodbc.connect(connection_string, attrs_before = { 1256:token_struct });

which is a bit more involved but allowed me to connect.


Sunday, September 22, 2024

GPU Notes

Minecraft is written in Java and uses JOCL to leverage the GPU[Oracle blog, 2020]
The organization of threads in CUDA terms:
  1. Thread: single unit of execution --- each thread has its own memory called registers
  2. Block: group of threads --- all threads in a block has access to a shared memory [Cuda Terminology].
  3. Grid: group of blocks --- all threads in a grid has access to [mutable] global memory and [immutable, global] constant memory.
 [Penny Xu's blog]
Matrix multiplication is best done using tiles. The reason for this is that when you ask for a value, you actually get an array of them. You could throw them away. Or, you could employ a clever optimization (descibed by Horace He here) where we process the surplus and save for later.

TornadoVM

Oracle's Gary Frost calls [YouTube] TornadoVM as "state of the art" when bringing GPU processing to the JVM.

"There are two ways to express parallelism with TornadoVM" [Juan Fumero] Annotations and "via an explicit parallel kernel API" using KernelContext

Interestingly, by using the Tornado --printKernel --printBytecodes --fullDebug flags, you can get Tornado to print out all sorts of goodies like the raw backend code it has created.

OpenCL Terminology

We can use a poker game to convey the idea of what is going on in OpenCL. Imagine a dealer and players who don’t interact with each other but who make requests to the dealer for additional cards.  

"In this analogy, the dealer represents an Open CL host, each player represents a device, the card table represents a context, and each card represents a kernel. Each player’s hand represents a command queue." [1]

"[Nested] Loops like this are common but inefficient. The inefficiency arises because each iteration requires a separate comparison and addition. Comparisons are time-consuming on the best of processors, but they’re especially slow on dedicated number-crunchers like graphic processor units ( GPU s). GPU s excel at performing the same operations over and over again, but they’re not good at making decisions. If a GPU has to check a condition and branch, it may take hundreds of cycles before it can get back to crunching numbers at full speed.

"One fascinating aspect of Open CL is that you don’t have to configure these loops in your kernel. Instead, your kernel only executes code that would lie inside the innermost loop. We call this individual kernel execution a work-item." [1]

"A work-group is a combination of work-items that access the same processing resources... Work-items in a work-group can access the same block of high-speed memory called local memory. Work-items in a work-group can be synchronized using fences and barriers." [1]

"One of the primary advantages of using Open CL is that you can execute applications using thousands and thousands of threads, called work-items." [1]

"Each block of local memory is specific to the work-items in a work-group." [1]

[1] OpenCL in Action

Thursday, September 5, 2024

Architecting Azure

Nineteen hours into a job migrating data from Synapse to an Azure SQL Server, we see: 

Failure happened on 'Source' side. ErrorCode=SqlOperationFailed,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=A database operation failed with the following error: 'A transport-level error has occurred when receiving results from the server. (provider: TCP Provider, error: 0 - The specified network name is no longer available.)',Source=,''Type=System.Data.SqlClient.SqlException,Message=A transport-level error has occurred when receiving results from the server. (provider: TCP Provider, error: 0 - The specified network name is no longer available.),Source=.Net SqlClient Data Provider,SqlErrorNumber=64,Class=20,ErrorCode=-2146232060,State=0,Errors=[{Class=20,Number=64,State=0,Message=A transport-level error has occurred when receiving results from the server. (provider: TCP Provider, error: 0 - The specified network name is no longer available.),},],''Type=System.ComponentModel.Win32Exception,Message=The specified network name is no longer available,Source=,'

Yikes. This is after 226gb and 165 million rows have been written at an average throughput of 3.3MB/s. Three copy activities stopped within three seconds of each other but nothing untoward was found in the AzureDiagnostics and AzureActivity logs. At first I thought the network was suspiciously quiet at the time the copy came to an end but with the Azure logs and this Pandas code here, I found that brief pauses were not that unusual:

Bursty network logs
Other engineers said they see this intermittently. "Welcome to the world of cloud computing where Transient Faults are bound to happen" [SO]. A cloud solution architect at Microsoft writes that throttling may be"done via blocking the connections or denying the new connections to SQL Azure database engine". Or it could be the network. "In Azure, most of the components are running on the internet, and that internet connection can produce transient faults intermittently." [Azure for Architects]

Never assume that a network is reliable, whether it be the cloud or not. I worked in an investment bank where a developer would make a connection to a system and if it was connected, assume the failover system was live (this was a blue/green deployment). At first blush, this was not unreasonable but networks can be tricksy. As it happened, the network must have hiccuped and he was connected to the standby system pumping live data into it.