Saturday, December 21, 2024

Debugging Polaris in Docker

Iceberg and Polaris sitting in a tree...

I have a proof of concept on GitHub that demonstrates how to use Apache Iceberg. Since I need Apache Polaris as Iceberg's metastore, I have it running in a container. 

If I create a catalog of FILE type, if the file path for storage is X, does this refer to the filesystem of Polaris or Spark?
Michael Collado
it's going to be both, so you shouldn't really use it in that scenario. The Polaris server is going to read/write metadata.json files in its own container's file system and the spark notebook will read/write data files in its own container's filesystem, so... [Discord]
In my PoC, I use a shared filesystem mount where both the Polaris container writes as well as the host's Spark instance.

However, tests were failing with the minimum of logging. When running Docker as a non-root user, the error in the Polaris logs looks like:

{"timestamp":1734433498899,"level":"INFO","thread":"dw-51 - POST /api/catalog/v1/manual_spark/namespaces/my_namespace/tables/IcebergCRUDSpec","logger":"org.apache.polaris.service.exception.IcebergExceptionMapper","message":"Handling runtimeException Failed to get file system for path: file:/tmp/polaris/my_namespace/IcebergCRUDSpec/metadata/00000-0daa8a08-5b5d-459a-bdd0-0663534f2007.metadata.json","mdc":{"spanId":"6ea71bffea6af726","traceId":"8b485bf56e7e27aac2e47ede876e02bd","realm":"default-realm","request_id":null},"params":{}}

When running containerised Polaris as root, the tests passed but I couldn't clean up the files on the shared filesystem mount afterwards as I was not running the test suite as root on the hosts.

Digging Deeper

That string ("Failed to get file system for path") lead me to org.apache.iceberg.hadoop.Util.getFs. Unfortuately, the nested exception is wrapped in the error reported above and lost.

So, we start the container with these flags:

 -eJAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:8788 -p8788:8788

since polaris-dropwizard-service is part expecting JAVA_OPTS to be set. 

Great, now we can put a breakpoint in Util.getFs and printStackTrace on the nested exception. It shows:

Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
        at jdk.security.auth/com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71)
        at jdk.security.auth/com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:134)
        at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
        at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
        at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
        at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
        at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
        at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:2148)

A quick look at the JDK code shows that UnixSystem.getUsername appears to be returning a null. And this appears to be because there is no user with my ID in the container - d'oh.

A Solution

One solution is to have a bespoke Docker entrypoint that creates the user (if it doesn't exist) given a user ID passed by the Fabric8's docker-maven-plugin and runs Polaris as that user. If it's the same user as that running the integration tests, both Polaris and Spark can write to the same directory and tables can be dropped without permission issues.

Monday, December 9, 2024

Pipeline testing

A lot of developer wonder how to test pipelines - see this Discord thread. The best way we've found is to create fixed, known data that when our transform acts on it, we can make reasonable assertions about what comes out.

Synthetic data

We are operating in the healthcare domain. We have a data set of events at hospitals and we want to turn them into a running total of patients who happen to occupy the hospital on any given day. Our test data should be:
precisely x medical events over D days for y different unique patients, distributed evenly over hospitals {a,b,c} where each patient is admitted on day i mod D and discharged i mod w days later, where i is the unique patient id in range [0,y]
Finally, the discharge date is null every z patients because we know we have bad data (urgh).
If we turn this natural language into a (Python) code signature, it looks like this: 

def random_inpatient_data(
    num_rows: int,
    num_patients: int,
    providers: list,
    discharge_null_every=97,
    attendance_date_fn=lambda i: random_timestamp(i, datetime.strptime("2021-11-01", "%Y-%m-%d")),
    start_time_fn=random_time_given,
) -> DataFrame:
    """
    Fake but realistic data for tests.
    :param num_rows: Total number of raws for this data set
    :param num_patients: The number of unique patients in this data set
    :param providers: possible providers from white a synthetic site will be created
    :param discharge_null_every: step size for null discharge dates
    :param attendance_date_fn: A function taking a seed and returning an attendance date
    :param start_time_fn: A function taking a date and a seed and generating a timestamp
    :return: Synthetic Inpatient data.
    """
...

There's some things to note. First, that there are sensible defaults so our tests create the data with only the peculiarities salient to its needs. Secondly, the data is random-ish but the same for any given set of parameters. 

It all sounds a little complicated but the whole thing is some two dozen lines that's called repeatedly across the test suite.

The assertions

There are certain invariants we can assert on. Some are straightforward like the hospital occupancy must be always be equal or greater than zero; or that the number of admissions for a day must always be less or equal to the running total for that day.

Some are a little more involved, for instance the sum of daily occupancy deltas over the time frame is zero (ie, everybody who is admitted is ultimately discharged). Obviously the algorithm must be robust and not count zombie patients who appear to be never discharged - remember that "the discharge date is null every z patients" above?

Another invariant is that we should have a reading for all days in a contiguous block. Remember that the input data is a series of events. If 10 patients are admitted on Monday and 5 are dischared on Friday and noting happens on Tuesday to Thursday, do we still have readings for those dates even though nothing happened? (We should)

Crafting the test data also raised some interesting corner cases that we needed to take back to the business analysts. For example. if a patient is discharged the same day they're admitted, do they show up on that day's occupancy numbers or not? If the discharge date is null what do we do with this patient? 

Conclusion

The use of synthetic data is a powerful tool when building automated regression tests. Tools like Deequ can test data quality but require a curated data sets. This is much harder than it sounds especially when their schemas and semantics change. 
 
Creating code that tests the functionality of your pipeline allows you to refactor your code with confidence. Once you try it, you'll never go back.

Diagnosing Kafka

The Problem

I have an integration test that create a cluster of 4 Kafka brokers using the Kraft protocol (ie, no Zookeeper). It then kills one and upon sending more messages, expects a consumer to receive them. Approximately 1 in 4 times, it fails as no messages are received. This 1 in 4 figure seems suspicious...

Killing a node sometimes meant the cluster was in a bad state. The other brokers kept regularly barfing UnknownHostException as they tried to talk to the missing node. Looking at the topics shed some light on the problem.

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test_topic
Topic: test_topic TopicId: MaHwLf2jQ62VNUxtiFUGvw PartitionCount: 2 ReplicationFactor: 1 Configs: 
Topic: test_topic Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: test_topic Partition: 1 Leader: none Replicas: 1 Isr: 1

The Solution

Having modified the test so that it now creates the NewTopic with a replication factor of 2, the same command now gives:

Topic: test_topic TopicId: w37ZKZnqR-2AdmT76oiWsw PartitionCount: 2 ReplicationFactor: 2 Configs: 
Topic: test_topic Partition: 0 Leader: 4 Replicas: 4,1 Isr: 4,1
Topic: test_topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

The test now passes every time (so far). We appear to have fixed the problem.

The Reason

Kafka replicates data over nodes (the Replicas above). These replicas may or may not be in-synch replications (Isr). These ISRs acknowledges the latest writes to the leader within a specified time and only they are considered for clean leader election.
"Producers write data to and consumers read data from topic partition leaders.  Synchronous data replication from leaders to followers ensures that messages are copied to more than one broker. Kafka producers can set the acks configuration parameter to control when a write is considered successful." [Disaster Recovery for Multi- Datacenter Apache Kafka Deployments, Confluent]
Conclusion

But what if this were a production topic not an integration test - how would you fix it? Well, if the data is not replicated, when the broker(s) hosting it die, you've lost the data. You can configure the topic to use unclean.leader.election.enable by using the a Kafka CLI tool. But it's a trade off. "If we allow out-of-sync replicas to become leaders, we will have data loss and data inconsistencies." [Conduktor]

Wednesday, November 27, 2024

Cancel culture

Cancellation is a thorny issue in software - especially when we use containers as they're constantly being killed. 

First, let's look at how some Scala frameworks handle it. This Discord chat is about where the boundaries lie in Cats. Basically, if code running flatMaps that are chained together is cancelled, the next flatMap may be executed (you can see that with a println) but the IO created in it is not.

We're going to SIGTERM this code in CatsCleanup:

  val resource = Resource.make(IO.println("acquire"))(_ => IO.println("release"))

  def run: IO[Unit] = for {
    _ <- resource.use(_ => IO.readLine)
  } yield {
    ()

with 

kill -SIGTERM $(jps | grep CatsCleanup | awk '{print $1}')

In 3.5.4 the Resource is released and the output looks like:

acquire
release

Process finished with exit code 143 (interrupted by signal 15:SIGTERM)

(The exit code when killing a process is 128 + the sigterm code. You can see the last exit code of a process in a Unix-like system with echo $?).

The equivalent code in ZIO (2.0.21):

  val release = ZIO.logInfo("release")

  val resource = ZIO.acquireRelease(ZIO.logInfo("acquire"))(_ => {

      release
  })

  override def run: ZIO[Any & ZIOAppArgs & Scope, Any, Any] = for {
    _ <- resource
    _ <-zio.Console.readLine("press a key")
  } yield ()

does nothing.

Why it's important

With many workloads moving to, say, Kubernetes, cancellation comes with the territory. 
"What happens when a pod starts up, and what happens when a pod shuts down? 
"When a pod starts in a rolling deployment without the readiness probe configured ... the pod starts receiving traffic even though the pod is not ready. The absence of a readiness probe makes the application unstable. ... 

The problem is"it takes more time to update the iptables rules than for the containers to be terminated by the Kubelet... The Kubelet immediately sends a SIGTERM signal to the container, and the endpoints controller sends a request back to the API server for the pod endpoints to be removed from all service objects... Due to the difference in task completion time, Services still route traffic to the endpoints of the terminating pods

[The solution involves] "adding a preStop hook to the deployment configuration. Before the container shuts down completely, we will configure the container to wait for 20 seconds. It is a synchronous action, which means the container will only shut down when this wait time is complete". 

This gives your application time to clean itself up.


Monday, November 11, 2024

Databricks in Azure

The Databricks plugin for VSCode is actually quite nice. Once installed, you can login via your IDE:


and Sign in to Databricks workspace using profile in your ~/.databrickscfg file. For my windows VM, this is just a host string pointing at my cluster and a auth_type of databricks-cli. Signing in opens a browser and I just authorise my session.

Now, the nice thing is that the code I develop in VSCode locally can be run remotely on the cluster by choosing Run File as Workflow


This uploads all my code into a Databricks cluster and I can see the job running clicking the link:
which opens my browser to the job's console.

Interestingly. trying to read via a notebook a Spark Dataframe that had been persisted as Parquet from this job lead to an obscure error. Apparently, I have to run:

spark.conf.set("spark.databricks.io.cache.enabled", "false")

Databricks as part of the Pipeline

Azure Data Factory is a little like Airflow but trying to do something clever with it (eg, use a CLI) is difficult. We found that having it invoke a Databricks notebook allowed us to do anything a little more sophisticated.

Note that you'll have to create a Databricks Linked Service to allow your ADF pipeline access to the notebook. These Microsoft docs proved helpful.

If you want the notebook to read from a parameter ADF passes to it, use something like this:

def read_param(key: str) -> str:
    dbutils.widgets.text(key, "")
    parameter = dbutils.widgets.get(key)
    return parameter

where key corresponds to one of the Base Parameters in the Notebook ADF activity. Note that there is a 10k character limit on these arguments [SO].

If you want the notebook to return something back to the ADF flow, have it terminate with:

dbutils.notebook.exit(YOUR_VALUE_HERE)

[See this SO.]

Connecting Databricks to a Database

If you want to connect a Databricks notebook to a cloud-based SQL Server instance, you can run:

%pip install adal
%pip install openpyxl

servername = "YOUR_MACHINE_NAME"
jdbcHostname = f"{servername}.database.windows.net"
jdbcDatabase = "YOUR_DB" 
driver = "com.microsoft.sqlserver.jdbc.spark"
url = f"jdbc:sqlserver://{jdbcHostname};databaseName={jdbcDatabase}"

as a one off, then:

import adal 
authority = f"https://login.microsoftonline.com/common"
context = adal.AuthenticationContext(authority)
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)

client_id = "YOUR_CLIENT_ID" # Client ID for Azure Cloud Shell (you do not have to replace this)
resource_app_id_url = "https://database.windows.net/" 

code = context.acquire_user_code(resource_app_id_url, client_id)
token = context.acquire_token_with_device_code(resource_app_id_url, code, client_id)
access_token = token["accessToken"]

Now you can use Spark to read the DB with the help of this token:

from pyspark.sql import SQLContext

jdbc_db_practice_mapping = (
        spark.read
        .format("com.microsoft.sqlserver.jdbc.spark")
        .option("url", url)
        .option("dbtable", "RightCare_ref.practice_CCG_quarter_lookup")
        .option("accessToken", access_token)
        .option("encrypt", "true")
        .option("hostNameInCertificate", "*.database.windows.net")
        .load()
)

Et voila. You're reading from SQL Server.

Saturday, October 26, 2024

NVIDIA Rapids

You can make Spark go faster by offloading some of the work to the GPU. There is an NVIDIA library (spark-rapids) to do this. 

A quick introduction

There are quite a few references to UCX in Spark Rapids. Assuming you have the hardware, this allows remote direct memory access (RDMA), basically sharing data in memory that circumvents the kernel.

"UVM or universal memory can allow main host memory to act essentially as swap for device(GPU) memory. This allows the GPU to process more data than fits in memory, but  can result in slower processing. This is an experimental feature." (from RapidsConf)

The spillStorageSize is the "Amount of off-heap host memory to use for buffering spilled GPU data before spilling  to local disk. Use -1 to set the amount to the combined size of pinned and pageable memory pools."

Old Ubuntu

Unfortunately, when running a test on Ubuntu 18 (I know, I know) I saw:

Caused by: java.lang.UnsatisfiedLinkError: /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/integration_tests/target/tmp/cudf3561040550923512030.so: /lib/x86_64-linux-gnu/libc.so.6: version `GLIBC_2.28' not found (required by /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/integration_tests/target/tmp/cudf3561040550923512030.so)
...
        at ai.rapids.cudf.NativeDepsLoader.loadDep(NativeDepsLoader.java:246)

After installing the latest Ubuntu on VMWare, it appears that you cannot access the GPU using VMWare Workstation.

You can however use a Docker image - just check that you have the Docker daemon that can handle NVIDIA installed by running:

henryp@adele: docker info | grep Runtimes 
Runtimes: io.containerd.runc.v2 nvidia runc

or 

henryp@adele:~$ grep -A2 -i nvidia /etc/docker/daemon.json
        "nvidia": {
            "path": "/usr/bin/nvidia-container-runtime",
            "runtimeArgs": []
        }

and download an image from here.

Now, I just need to run something like:

docker run --rm --gpus all  --runtime=nvidia  -it   -v /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/:/home/henryp/Code/Scala/SparkEcosystem/spark-rapids/ -v /home/henryp/Tools:/home/henryp/Tools -v /home/henryp/.m2:/.m2  -v /usr/local/bin/Java:/usr/local/bin/Java  --user root  nvidia/cuda:12.6.1-devel-ubi8 /bin/bash

and once I'm in, run:

yum -y install git
yum -y install diffutils
yum -y install rsync
export MAVEN_HOME=/home/henryp/Tools/Build/Maven/Latest
export PATH=$MAVEN_HOME/bin:$PATH
export JAVA_HOME=/usr/local/bin/Java/Latest17
export PATH=$JAVA_HOME/bin:$PATH

Constantly installing some tools for Rapids to build proved a bit tedious, so I extended the NVIDIA docker image with this Dockerfile:

FROM nvidia/cuda:12.6.1-devel-ubi8

RUN yum -y install git
RUN yum -y install diffutils
RUN yum -y install rsync

Also, if I wanted to use mvnDebug [SO], I had to set the network of the container to host using --network host [SO]. Then, it's just a matter of running:

mvnDebug scalatest:test -DforkMode=never

and attaching a debugger from the host machine.

Unfortunately, sometimes when I close and re-open my laptop, the tests start failing with:

...
WindowedBlockIteratorSuite:
terminate called after throwing an instance of 'cudf::jni::jni_exception'
  what():  CUDA ERROR: code 999
/home/henryp/Tools/Build/Maven/Latest/bin/mvnDebug: line 36:   216 Aborted                 (core dumped) env MAVEN_OPTS="$MAVEN_OPTS" MAVEN_DEBUG_OPTS="$MAVEN_DEBUG_OPTS" "`dirname "$0"`/mvn" "$@"

Apparently, I must reboot my machine :(

Some miscellaneous code notes

Finally, I get to look at the code in action. Rapids adds RapidsExecutorPlugin (which extends the Spark ExecutorPlugin interface) that immediately initializes the GPU and memory (see GpuDeviceManager.initializeGpuAndMemory). Note that in setGpuDeviceAndAcquire we see a comment that says: 
"cudaFree(0) to actually allocate the set device - no process exclusive required since we are relying on Spark to schedule it properly and not give it to multiple executors"
This is why the tests (HashAggregatesSuite) a hard-coded to use just one CPU core.

Rapids then has a parallel set of classes that look a lot like the Spark classes that represent linear algebra structures. For example, there is a ColumnVector abstraction in both Spark and Rapids. The interesting Rapids one is GpuColumVector  - which implements this Spark interface - that can be instantiated by a GpuShuffleExchangeExecBase. Amongst other things, objects of these classes contain the address of their off-heap data and a reference counter.

Still playing.


Saturday, October 5, 2024

Optimising GPU code

I complained to Juan Fumero that a benchmark indicated that the GPU was not giving much of a performance improvement. JMH reported the GPU being a moderate 20% faster than the CPU:

tornado -jar tornado-benchmarks/target/jmhbenchmarks.jar uk.ac.manchester.tornado.benchmarks.sgemv.JMHSgemV
...
Benchmark              Mode  Cnt         Score         Error  Units
JMHSgemV.sgemVJava     avgt    5  72366270.751 ± 5916807.539  ns/op
JMHSgemV.sgemVTornado  avgt    5  57583087.103 ± 2523449.341  ns/op

(SGEMM is single precision general matrix multiplication. GEMV indicates that we're multiplying a matrix with a vector).

Juan replied that I should try TornadoVM's  --enableProfiler console switch and see where the time was being spent. Sure enough, COPY_IN_TIME was ~28ms, about the same as TOTAL_KERNEL_TIME.

Note that the total kernel time is the time it takes the GPU to perform the computation and the total kernel dispatch time is the time it takes to schedule the kernel (ie, the function being executed). In this case, dispatch time is ~6us - three orders of magnitude smaller than the execution time.

Juan also said that "Matrix Vector is not as compute intensive as other applications", so instead I tried the matrix/matrix multiplication. Here, the GPU shines:

Benchmark              Mode  Cnt           Score         Error  Units
JMHSgemm.sgemmJava     avgt    5  1773297262.188 ± 4115731.439  ns/op
JMHSgemm.sgemmTornado  avgt    5     8478409.506 ±  246919.368  ns/op

That makes the GPU 200 times faster than the CPU. Now COPY_IN_TIME is about 1ms and TOTAL_KERNEL_TIME is about 5.5ms.

Now we're talking. But continuing this optimization rampage, it's worth noting that "It has become tribal knowledge that the particular shapes chosen for matmuls has a surprisingly large effect on their performance." [Horace He] TL;DR; He's article explains how fitting the small memory tiles onto a large matrix can hugely change performance - basically, that in a row-major MxN matrix, N must be a factor of the GPU's cache line for best results.

Changes in Java's memory

In the old days, we'd use sun.misc.Unsafe.allocateMemory to use off-heap memory. This code goes straight to the OS and asks for memory via os::realloc. But using Unsafe is bad practise. Not only is it specific to a particular flavout of JVM, it allows access to raw memory. The latter is "fine" if that memory is off-heap but if you are using it to access a Java object, the garbage collector can change its memory location without warning.

There are several modern alternatives. Since Java 9, java.lang.invoke.VarHandle has been the recommended alternative. It provides the same level of low-level access as Unsafe but with better safety and control over memory visibility. That is, its memory access patterns apparently offer finer grained control - eg, volatile access without enforcing strict instruction ordering. 

It's interesting to note that the high performing interoperability framework, Apache Arrow, does not use VarHandle. It still uses Unsafe as VarHandle has bound checking etc that is slower than raw access. 

Since Java 20, we've had Project Panama's Foreign Function & Memory API (JEP-424) spec (it appears Apache Arrow doesn't use it because it's too new). If we run this code:

MemorySegment memorySegment = Arena.global().allocate(1024 * 1024 * 128, 8);         System.out.println(memorySegment.address());

then look for the address while it's still running in /proc/PID/maps (where PID is the ID of the Java process), we can see that the Linux OS now manages a new area of memory. For instance, when I ran it, the output was 0x7fbaccdbe010 and I can see in the maps pseudo file:

7fbaccdbe000-7fbad4dbf000 rw-p 00000000 00:00 0 

This represents the 128 megs of space plus 4096 bytes (presumably a page for meta data).

Note that an Arena in this context is a large chunk of memory that is managed in user space rather than the app code constantly calling the kernel requesting memory piecemeal. This is an optimization.

Now, since Java and C/C++ are IEEE 754 compliant, and now they can pass native memory to each other, you can transparently pass floating point numbers between code bases and run the C/C++ program in the JVM - no more need for JNI! (Interestingly, note that Python is often IEEE754 compliant but it is not guaranteed to be).

It's interesting to note that the GPU enabled Tornado VM uses the java.lang.foreign package to move data to and from the GPU.

Friday, September 27, 2024

Running Iceberg Catalogs in a test suite


When trying to put some BDDs together here (GitHub) for Iceberg and Spark integration, I hit some snags. I was using the local catalog. If I used the Hive catalog, I was getting this error ("Iceberg does not work with Spark's default hive metastore"). 

The reason is that Derby, which is bundled with the Hive metastore in Spark, "doesn't support the interfaces (or concurrent access) required for Iceberg to work" according to Russell Spitzer. He suggests using the Hadoop catalog instead. 

I didn't fancy setting up a more robust Hive metastore that uses a more "professional" database like Postgres so I  followed Russel's advice to use the Hadoop catalog... and immediately hit this problem that's discussed more in Slack. Basically, CALLing via Spark SQL some procedures throws an IllegalArgumentException stating "Cannot use non-v1 table". This is a Spark v1 table, and nothing to do with Iceberg v1 tables, it seems.

OK, so this got me thinking that if an easy way to use Hive and Hadoop catalogs don't work, let's try a REST catalog. Apache Polaris seemed a good candidate. At first, I was hoping to have it run in-process but because of transitive dependencies, it wasn't possible to have everything running easily in the same JVM.

Hmm, so we need it in a separate process. OK, we can Docker-ise that. After adjusting the fixtures, I noticed that while running an erstwhile passing test for a second time leads to a failure. Looking at the stack traces, it appears that sure, we DROP TABLE IF EXISTS ..., but the underlying files are left behind. Russell Spitzer helped me out (again) by pointing out that a PURGE keyword needs to be added to the SQL.

Does the catalog also need to delete metadata files even if the query engine deletes the data files?
Russell Spitzer
This isn't actually defined anywhere. So at the moment we are kind of in a wild west, the Spark implementation deletes absolutely everything. I believe for Trino it just sends the request to the catalog and it's up to the catalog to decide. So for Spark "no", and it explicitly sends a drop table request to the rest catalog without a purge flag.
Russell says Polaris will delete files with this flag [caveat] although there is debate about making this service asynchronous ("I think polaris could own this service too but I know some folks (like netflix) have delete services already"), and "the option to hook into an external delete service".

So, there is some latitude in how one implements a catalog, it seems. This would explain why some of my tests are failing - some assumptions I made are not guaranteed. Still working on making them pass.

Thursday, September 26, 2024

Azure Automation

It's surprising how many silly little jobs can be automated away for big productivity wins. I wrote some Python code to run on a laptop that plays with Azure cloud. The main takeaway points were:

The best way to get access is by using azure.identity.InteractiveBrowserCredential in the azure-identity package. This opens the browser and prompts the user to login. This can then be used to instantiate a azure.storage.filedatalake.DataLakeServiceClient in azure-storage-file-datalake. With this we can upload files.

Then, to kick off ADF jobs, we can use the same credential and pass it to azure.mgmt.datafactory.DataFactoryManagementClient in azure-mgmt-datafactory. As a nice little aside, you can use webbrowser in the web-browser package that will open your browser to the ADF job by crafting a URL with the job ID so you can monitor its progress.

This all seemed pretty straight forward but accessing an Azure SQL instance was somewhat harder. To access the credential, I referred to this SO answer. Here I used the InteractiveBrowserCredential as before but then I needed:

    token = credential.get_token("https://database.windows.net/.default")
    connection_string = f"""
        Driver={Driver};
        Server={server};
        Database={database};
        Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;
    """
    token_bytes = token.token.encode("UTF-16-LE")
    token_struct = struct.pack(f'<I{len(token_bytes)}s', len(token_bytes), token_bytes)
    conn = pyodbc.connect(connection_string, attrs_before = { 1256:token_struct });

which is a bit more involved but allowed me to connect.


Sunday, September 22, 2024

GPU Notes

Minecraft is written in Java and uses JOCL to leverage the GPU[Oracle blog, 2020]
The organization of threads in CUDA terms:
  1. Thread: single unit of execution --- each thread has its own memory called registers
  2. Block: group of threads --- all threads in a block has access to a shared memory [Cuda Terminology].
  3. Grid: group of blocks --- all threads in a grid has access to [mutable] global memory and [immutable, global] constant memory.
 [Penny Xu's blog]
Matrix multiplication is best done using tiles. The reason for this is that when you ask for a value, you actually get an array of them. You could throw them away. Or, you could employ a clever optimization (descibed by Horace He here) where we process the surplus and save for later.

TornadoVM

Oracle's Gary Frost calls [YouTube] TornadoVM as "state of the art" when bringing GPU processing to the JVM.

"There are two ways to express parallelism with TornadoVM" [Juan Fumero] Annotations and "via an explicit parallel kernel API" using KernelContext

Interestingly, by using the Tornado --printKernel --printBytecodes --fullDebug flags, you can get Tornado to print out all sorts of goodies like the raw backend code it has created.

OpenCL Terminology

We can use a poker game to convey the idea of what is going on in OpenCL. Imagine a dealer and players who don’t interact with each other but who make requests to the dealer for additional cards.  

"In this analogy, the dealer represents an Open CL host, each player represents a device, the card table represents a context, and each card represents a kernel. Each player’s hand represents a command queue." [1]

"[Nested] Loops like this are common but inefficient. The inefficiency arises because each iteration requires a separate comparison and addition. Comparisons are time-consuming on the best of processors, but they’re especially slow on dedicated number-crunchers like graphic processor units ( GPU s). GPU s excel at performing the same operations over and over again, but they’re not good at making decisions. If a GPU has to check a condition and branch, it may take hundreds of cycles before it can get back to crunching numbers at full speed.

"One fascinating aspect of Open CL is that you don’t have to configure these loops in your kernel. Instead, your kernel only executes code that would lie inside the innermost loop. We call this individual kernel execution a work-item." [1]

"A work-group is a combination of work-items that access the same processing resources... Work-items in a work-group can access the same block of high-speed memory called local memory. Work-items in a work-group can be synchronized using fences and barriers." [1]

"One of the primary advantages of using Open CL is that you can execute applications using thousands and thousands of threads, called work-items." [1]

"Each block of local memory is specific to the work-items in a work-group." [1]

[1] OpenCL in Action

Thursday, September 5, 2024

Architecting Azure

Nineteen hours into a job migrating data from Synapse to an Azure SQL Server, we see: 

Failure happened on 'Source' side. ErrorCode=SqlOperationFailed,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=A database operation failed with the following error: 'A transport-level error has occurred when receiving results from the server. (provider: TCP Provider, error: 0 - The specified network name is no longer available.)',Source=,''Type=System.Data.SqlClient.SqlException,Message=A transport-level error has occurred when receiving results from the server. (provider: TCP Provider, error: 0 - The specified network name is no longer available.),Source=.Net SqlClient Data Provider,SqlErrorNumber=64,Class=20,ErrorCode=-2146232060,State=0,Errors=[{Class=20,Number=64,State=0,Message=A transport-level error has occurred when receiving results from the server. (provider: TCP Provider, error: 0 - The specified network name is no longer available.),},],''Type=System.ComponentModel.Win32Exception,Message=The specified network name is no longer available,Source=,'

Yikes. This is after 226gb and 165 million rows have been written at an average throughput of 3.3MB/s. Three copy activities stopped within three seconds of each other but nothing untoward was found in the AzureDiagnostics and AzureActivity logs. At first I thought the network was suspiciously quiet at the time the copy came to an end but with the Azure logs and this Pandas code here, I found that brief pauses were not that unusual:

Bursty network logs
Other engineers said they see this intermittently. "Welcome to the world of cloud computing where Transient Faults are bound to happen" [SO]. A cloud solution architect at Microsoft writes that throttling may be"done via blocking the connections or denying the new connections to SQL Azure database engine". Or it could be the network. "In Azure, most of the components are running on the internet, and that internet connection can produce transient faults intermittently." [Azure for Architects]

Never assume that a network is reliable, whether it be the cloud or not. I worked in an investment bank where a developer would make a connection to a system and if it was connected, assume the failover system was live (this was a blue/green deployment). At first blush, this was not unreasonable but networks can be tricksy. As it happened, the network must have hiccuped and he was connected to the standby system pumping live data into it. 

Sunday, July 21, 2024

Hive for an Iceberg test suite

I'm currently having trouble changing my catalog implementation from hadoop to hive for my test suite. The tests work fine as they are but they're backed by a local filesystem which is not atomic. For the purposes of testing, this seemed OK but the fact they were not really realistic was nagging me.

Spark can use numerous catalogs. You just need to set them up in SparkConf by giving the config their own namespaces below spark.sql.catalog. Your call to writeTo will use the default namespace unless another is defined.

If you use a Hive catalog, concurrent creation of a table will blow up with:

org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `database`.`concurrentwritespec` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects.
at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:255)

Whilte backed by a local file system, the test correctly encounters an error but this time it's:

org.apache.spark.util.Utils - Aborting task
org.apache.iceberg.exceptions.CommitFailedException: Version 1 already exists: /tmp/SparkForTesting5181526160591714739/ConcurrentWriteSpec/metadata/v1.metadata.json

The behaviour for both is entirely expected but the actual Exceptions diff.

Meanwhile, a test that merely appended data blew up (see this test) using hive rather than hadoop failing with:

Cannot write into v1 table: `spark_catalog`.`database`.`spark_file_test_writeto`.

The reason for this currently eludes me but it seems I am not the only one.

Spark Datasource API V1 and V2

Amongst the problem with V1 was "Partitioning and Sorting is not propagated from the data sources, and thus, not used in the Spark optimizer" [Qorela]. V2 gives much greater freedom to developers of external stores, like various cloud solutions. For example Russel Spitzer says:
"Now V2 Predicate function pushdown is allowed (or at least should be coming soon) but for that you must use the datasource functions and not the spark wones" [Slack]

"Best practice for partitions is partition by low cardinality and sort for high cardanality. This may reduce the level of partitions you need" [Slack]
Another key difference between the two is atomicity. "Spark ships with two default Hadoop commit algorithms — version 1, which moves staged task output files to their final locations at the end of the job, and version 2, which moves files as individual job tasks complete...  the v2 Hadoop commit protocol is almost five times faster than v1. This is why in the latest Hadoop release, v2 is the default commit protocol... while v2 is faster, it also leaves behind partial results on job failures, breaking transactionality requirements. In practice, this means that with chained ETL jobs, a job failure — even if retried successfully — could duplicate some of the input data for downstream jobs." [Databricks]

Remediation attempts

Changing the table made no difference to the error:

      spark.sql(
        s"""
          |ALTER TABLE $fqn
          |SET TBLPROPERTIES ('format-version'='2')
      """.stripMargin)

So, I tried creating the table with:

... USING ICEBERG TBLPROPERTIES ('format-version'='2')

only for it to then complain upon table creation:

ERROR | o.a.h.h.metastore.RetryingHMSHandler - MetaException(message:Unable to update transaction database java.sql.SQLSyntaxErrorException: Table/View 'NEXT_LOCK_ID' does not exist.

coming from Hive when it calls Derby. "The Derby catalog doesn't support the interfaces (or concurrent access) required for iceberg to work" [GitHub issue]

So, for the time being at least, it seems that my tests will need to be somewhat unrealistic when it comes to atomicity.

Sunday, June 16, 2024

First look at the Unity Catalog

Databrick's Unity Catalog has now been open sourced [Git]. But there's not a huge amount of code there - I counted a mere 133 Java files which were neither test nor examples.

This is not too surprising since it's little more than a REST API over a (H2) database that stores metadata. It is just a catalog after all.

What is a little more surprising is that "MANAGED table creation is not supported yet." [GitHub] Managed tables are those that Unity, well, manages. That is, the whole life cycle of the table is under its purview. Contrast this with EXTERNAL tables that live elsewhere, who knows where - it's not important. 

Remember that Unity is a catalog not a data store. This confusing the map with the territory [Wikipedia] is common when coming across metastores. This is perhaps due to the Apache Hive project that is somewhat moribund these days and Hive's metastore which is very much alive. But think of a metastore like web URLs where the locale of the machine hosting the website is irrelevant. Only its domain name is what we care about.

With Databricks recent acquisition of Tabular.io (the defacto commercial force behind Apache Iceberg) it will be interesting to see how Iceberg integrates with it, if at all. Iceberg does not (as demonstrated here) store its table information in a metastore (unlike Delta). All its information is contained in the metadata files. Whether Databricks will encourage Iceberg to be tied to the metastore remains to be seen.

Friday, May 17, 2024

More SQL Server Tuning


Resources

You can see who is hogging resources with:

SELECT req.session_id, req.status, req.start_time, req.cpu_time AS 'cpu_time_ms',
req.logical_reads,req.dop,s.login_name,s.host_name,s.program_name,
object_name(st.objectid,st.dbid) 'ObjectName',
REPLACE (REPLACE (SUBSTRING (st.text,(req.statement_start_offset/2) + 1,
 ((CASE req.statement_end_offset WHEN -1 THEN DATALENGTH(st.text)
   ELSE req.statement_end_offset END - req.statement_start_offset)/2) + 1),
   CHAR(10), ' '), CHAR(13), ' ') AS statement_text,
qp.query_plan,qsx.query_plan as query_plan_with_in_flight_statistics
FROM sys.dm_exec_requests as req
JOIN sys.dm_exec_sessions as s on req.session_id=s.session_id
CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) as st
OUTER APPLY sys.dm_exec_query_plan(req.plan_handle) as qp
OUTER APPLY sys.dm_exec_query_statistics_xml(req.session_id) as qsx
ORDER BY req.cpu_time desc;

(This was generated by Azure). The output looks something like:


Note the dop column. This indicates the degree of parallelization. One core seems a bit measly so I forced it to use more by adding this at the end of my SELECT statement:

OPTION (USE HINT ('ENABLE_PARALLEL_PLAN_PREFERENCE'));

[See the comment below Paul White's comment in this SO post].

Consequently, my query was 4x faster (at the possible expense of bothering other users in this multi-tenanted machine).

Indexes

When creating an index, you might see a trailing "WITH (...) ON [PRIMARY];" Here, "you can specify options for data storage, such as a partition scheme, a specific filegroup, or the default filegroup. If the ON option isn't specified, the index uses the settings partition or filegroup settings of the existing table." [Microsoft]

The option here is to have the index on a seperate disk to your data. With two disks you can speed things up as each disk will not block the other's access, although the efficacy of this is questioned [SO].

Padding and Fill Factor pertain to how the leaf data is stored and is only relevant if there are lots of random changes [SO].

Casting

Autoboxing can have a negative effect on Java apps but I didn't casting would have such a large effect in SQL. When we removed unnecessary CASTing from a query running on a 12 core SQL Server, performance time dropped from 15 minutes to 3. 

It's true that the query was horribly inefficient. One of the CASTs was to turn a date to a VARCHAR where a regex was used to check the date was a September... Another was due to some data being imported incorrectly and its PK being a float when it should have been a VARCHAR. This lead to a CAST when it was JOINed to another table.

Using the SQL at the top of the post revealed the reason for this improvement was that SQL Server decided to parallelize the query so there was no need for ENABLE_PARALLEL_PLAN_PREFERENCE.

Friday, April 19, 2024

Tuning SQL Server

Note that diagnosing the query plan in MS SQL Server is not hugely different to examing Spark query plans. Some operations are conceptually the same. 

Similarly, putting indexed columns into a function will remove any benefit they may bring just as they do for Spark's predicate pushdowns. In both, queries must be sargeable [Brent Ozar's blog]. SARGs are search arguments and "a sargable predicate is one of form (or which can be put into the form) column comparison-operator value". [SO]

Query Plans

"You shouldn't rely too much on cost percentages in execution plans" [SO] says SQL Server luminary, Paul White. 

Heap Scan

Before we look at the scan aspect, what is a heap? "A heap is a table without a clustered index... One or more nonclustered indexes can be created on tables stored as a heap."

What's the advantage of a heap? "Because data is inserted without enforcing a strict order, the insert operation is usually faster than the equivalent insert into a clustered index." 

There are many disadvantages to a heap, notably, "Do not use a heap when the data is frequently grouped together." [Microsoft]

Nested Loop Joins

"The database usually performs this matching step in a nested-loops join by indexed lookups on the join key, repeated for each row from the driving table" [1] The DB filters as it goes.

Hash Joins

"Based on table and index statistics, the cost-based optmizer estimates which of these two independent tables will retyurn fewer rows after discarding filtered rows. It chooses to has the complete results from that singe-table query...

"It then executes the larger query ... returning the driving rowset. As eadh rows exits this step, the database executes thte same hash function in its join key and uses the hash-function result to go directly to the corresponding hash bucket for the other rowset. When it reaches the right hash bucket, the database searches the tiny list of rows in that bucket to find matches."

The catch with this approach is you "hope those hash buckets fit entirely in memory, but if necessary, the database temporarily dedicates scracth disk space to hold the buckets... A large prehashed rowset could require unexpected disk scratch space, performing poorly and possibly even running out of space."

"It is the most memory-intensive of any of the joins" [SO].

Sort-merge Joins

Spark does exactly this. This is where it "reads the two tables independently but, instead of matching rows by hashing, it presorts rowsets on the join key and merges the sorted lists... Once the two lists are sorted, the matching process is fast but presorting lists is expensive and nonrobust." [1] For this reason, hash joins are preferred. They don't have this downside but have all the same advantages.

In the event of a nonrobust query, SQL Server may throw a error 701 "There is insufficient memory to run this query" [Microsoft docs].

Indices

You can see all the indexes by executing: 

select * from sys.indexes

But remember that "cost-based optimizers often do a terrible job if they do not have statistics on all the tables and indexes involved in the query. It is therefore imperative to maintain statistics on tables and indexes reliably; this includes regenerating statistics anytime table volumes change much or anytime tables or indexes are rebuilt." [1]

Clustered vs. Non Clustered Indexes

Clustered indexes actually change where rows are stored on disk. As a result, "there can be only one clustered index per table" [Microsoft].  Non-clustered indexes by contrast are just pointers to the row data.

Adding a clustered index can be an intense operation that is measured in minutes or even hours. For instance, adding a clustered index to a table of 765k rows and about 20 columns that are dates and varchars (13 columns totally a size of about 1800) takes about 15 minutes on a 12 core Azure SQL Server. But this one index reduced the TotalSubtreeCost from c. 131k to 71k.

Bit-map indexes
"Each stored value of a bit-mapped index points to what amount to a list of yes/no bits that map to the whole list of table rows. These bit strings are ways to AND and OR together with other bit stroings of other bit-mapped indexes... The big catch is that such bit strings are expensive to maintain in sync with frequently changing table contents... Bit-mapped indexes work best for tables that are mostly read-only... The best scenario for success is precisely the data-warehouse scenario for which bit-mapped indexes were designed." [1]

Columnstore

SQL Server seems to be stealing ideas from other big data tool as it now allows columnar storage. "Columnstore indexes are the standard for storing and querying large data warehousing fact tables." [Microsoft] Adding this to one of my tables made the cost drop two orders of magnitude... but the query still took over an hour before I killed it. I guess you should never judge a query by its cost [Brent Ozar].

[Aside: I eventually made the query work in a reasonable if not stellar duration by dropping a clustered index and having only the columnstore index, not the two together as I previously had.]

[Finally, some nice people on Discord gave some links to their favourite authors for all things SQL. They are Kimberly Tripp, Itzik Ben-GanNiko Neugebauer, and Paul White).

[1] SQL Tuning, Dan Tow

Saturday, April 6, 2024

When adding more CPUs does not help distressed CPUs

This is an interesting problem on Discourse where the symptoms belie the cause. Here, a very beefy Spark cluster is taking a long time process (admittedly) a large amount of data. However, it's the CPUs that are getting hammered. 

Insanely high CPU usage

The temptation at this point is to add more CPU resources but this won't help much.

When your Spark jobs that are not computationally intensive are using large amounts of CPU, there's an obvious suspect. Let's check time spent in Garbage Collection:


Insanely large GC Times

Shuffle per worker seems modest but look at those GC Times. In a five hours job, nearly two hours is spent just garbage collecting. 

And this is something of a surprise to people new to Spark. Sure, it delivers on its promise to process more data than can fit in memory but if you want it to be performant, you need to give it as much memory as possible.  

Friday, April 5, 2024

Network Adventures in Azure Databricks

My Azure Databricks cluster could not see one of my Blob containers although it could see others in the same subscription. The error in Databricks looked something like this: 

ExecutionError: An error occurred while calling o380.ls.
: Status code: -1 error code: null error message: java.net.SocketTimeoutException: connect timed outjava.net.SocketTimeoutException: connect timed out
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:423)
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:274)
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:214)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
...

My first suspicion was that because they were in different resource groups, this could explain things.

Resource groups
"Resource groups are units of deployment in ARM [Azure Resource Manager]. 
"They are containers grouping multiple resource instances in a security and management boundary. 
"A resource group is uniquely named in a subscription. 
"Resources can be provisioned on different Azure regions and yet belong to the same resource group. 
"Resource groups provide additional services to all the resources within them. Resource groups provide metadata services, such as tagging, which enables the categorization of resources; the policy-based management of resources; RBAC; the protection of resources from accidental deletion or updates; and more... 
"They have a security boundary, and users that don't have access to a resource group cannot access resources contained within it.  Every resource instance needs to be part of a resource group; otherwise, it cannot be deployed." [Azure for Architects]
That last paragraph is interesting because I can access the container I want via the Azure portal. So, a friendly sysadmin suggested this was barking up the wrong tree and instead looked at:

Virtual Networks
"A VNet is required to host a virtual machine. It provides a secure communication mechanism between Azure resources so that they can connect to each other. 
"The VNets provide internal IP addresses to the resources, facilitate access and connectivity to other resources (including virtual machines on the same virtual network), route requests, and provide connectivity to other networks. 
"A virtual network is contained within a resource group and is hosted within a region, for example, West Europe. It cannot span multiple regions but can span all datacenters within a region, which means we can span virtual networks across multiple Availability Zones in a region. For connectivity across regions, virtual networks can be connected using VNet-to-VNet connectivity." [Azure for Architects]
Nothing obvious here. Both Databricks and the container were on the same network. However, they weren't on the same subnet.

Network Security Groups
"Subnets provide isolation within a virtual network. They can also provide a security boundary. Network security groups (NSGs) can be associated with subnets, thereby restricting or allowing specific access to IP addresses and ports. Application components with separate security and accessibility requirements should be placed within separate subnets." [Azure for Architects]
And this proved to be the problem. Databricks and the container are on the same virtual network but not the same subnet and there was an NSG blocking communication between these subnets.

Note that changes can take a few minutes to propagate, sometimes faster but sometimes slower. My sysadmin says he has seen it take up to an hour.

AWS Real Estate

Just some notes I've made playing around with AWS real estate.

ECS
Amazon's offering that scales Docker containers. Whereas EC2 is simply a remote VM, ECS is a "logical grouping of EC2 machines" [SO]

Fargate
Is a serverless version of EC2 [SO].
 
Kinesis
A propriertary Amazon Kafka replacement. While Kafka writes data locally, Kinesis uses a quorum of shards.

MSK
Amazon also offers a hosted Kafka solution called MSK (Managed Streaming for Kafka). 

Lambda
Runs containers like Docker that exists for up to 15 minutes and whose storage is ephemeral.

Glue
A little like Hive. It has crawlers that are batch jobs that compile metadata, thus doing some of the job of Hive's metastore. In fact, you can delegate the meta store that Spark uses to use Glue as its backing store. See:

EMR
EMR is AWS's MapReduce tool on which we can run Spark. "You can configure Hive to use the AWS Glue Data Catalog as its metastore." [docs] If you want to run Spark locally but still take advantage of Glue, follow these instructions.

Athena
Athena is AWS's hosted Trino offering. You can make data in S3 buckets available to Athena by using Glue crawlers.

Step Functions
AWS's orchestration of different services within Amazon's cloud.

CodePipeline
...is AWS's CI/CD offering.

Databases
DynamoDB is a key/value store and Aurora is a distributed relational DB.