Monday, November 11, 2024

Databricks in Azure

The Databricks plugin for VSCode is actually quite nice. Once installed, you can login via your IDE:


and Sign in to Databricks workspace using profile in your ~/.databrickscfg file. For my windows VM, this is just a host string pointing at my cluster and a auth_type of databricks-cli. Signing in opens a browser and I just authorise my session.

Now, the nice thing is that the code I develop in VSCode locally can be run remotely on the cluster by choosing Run File as Workflow


This uploads all my code into a Databricks cluster and I can see the job running clicking the link:
which opens my browser to the job's console.

Interestingly. trying to read via a notebook a Spark Dataframe that had been persisted as Parquet from this job lead to an obscure error. Apparently, I have to run:

spark.conf.set("spark.databricks.io.cache.enabled", "false")

Databricks as part of the Pipeline

Azure Data Factory is a little like Airflow but trying to do something clever with it (eg, use a CLI) is difficult. We found that having it invoke a Databricks notebook allowed us to do anything a little more sophisticated.

Note that you'll have to create a Databricks Linked Service to allow your ADF pipeline access to the notebook. These Microsoft docs proved helpful.

If you want the notebook to read from a parameter ADF passes to it, use something like this:

def read_param(key: str) -> str:
    dbutils.widgets.text(key, "")
    parameter = dbutils.widgets.get(key)
    return parameter

where key corresponds to one of the Base Parameters in the Notebook ADF activity. Note that there is a 10k character limit on these arguments [SO].

If you want the notebook to return something back to the ADF flow, have it terminate with:

dbutils.notebook.exit(YOUR_VALUE_HERE)

[See this SO.]

Connecting Databricks to a Database

If you want to connect a Databricks notebook to a cloud-based SQL Server instance, you can run:

%pip install adal
%pip install openpyxl

servername = "YOUR_MACHINE_NAME"
jdbcHostname = f"{servername}.database.windows.net"
jdbcDatabase = "YOUR_DB" 
driver = "com.microsoft.sqlserver.jdbc.spark"
url = f"jdbc:sqlserver://{jdbcHostname};databaseName={jdbcDatabase}"

as a one off, then:

import adal 
authority = f"https://login.microsoftonline.com/common"
context = adal.AuthenticationContext(authority)
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)

client_id = "YOUR_CLIENT_ID" # Client ID for Azure Cloud Shell (you do not have to replace this)
resource_app_id_url = "https://database.windows.net/" 

code = context.acquire_user_code(resource_app_id_url, client_id)
token = context.acquire_token_with_device_code(resource_app_id_url, code, client_id)
access_token = token["accessToken"]

Now you can use Spark to read the DB with the help of this token:

from pyspark.sql import SQLContext

jdbc_db_practice_mapping = (
        spark.read
        .format("com.microsoft.sqlserver.jdbc.spark")
        .option("url", url)
        .option("dbtable", "RightCare_ref.practice_CCG_quarter_lookup")
        .option("accessToken", access_token)
        .option("encrypt", "true")
        .option("hostNameInCertificate", "*.database.windows.net")
        .load()
)

Et voila. You're reading from SQL Server.

Saturday, October 26, 2024

NVIDIA Rapids

You can make Spark go faster by offloading some of the work to the GPU. There is an NVIDIA library (spark-rapids) to do this. 

A quick introduction

There are quite a few references to UCX in Spark Rapids. Assuming you have the hardware, this allows remote direct memory access (RDMA), basically sharing data in memory that circumvents the kernel.

"UVM or universal memory can allow main host memory to act essentially as swap for device(GPU) memory. This allows the GPU to process more data than fits in memory, but  can result in slower processing. This is an experimental feature." (from RapidsConf)

The spillStorageSize is the "Amount of off-heap host memory to use for buffering spilled GPU data before spilling  to local disk. Use -1 to set the amount to the combined size of pinned and pageable memory pools."

Old Ubuntu

Unfortunately, when running a test on Ubuntu 18 (I know, I know) I saw:

Caused by: java.lang.UnsatisfiedLinkError: /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/integration_tests/target/tmp/cudf3561040550923512030.so: /lib/x86_64-linux-gnu/libc.so.6: version `GLIBC_2.28' not found (required by /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/integration_tests/target/tmp/cudf3561040550923512030.so)
...
        at ai.rapids.cudf.NativeDepsLoader.loadDep(NativeDepsLoader.java:246)

After installing the latest Ubuntu on VMWare, it appears that you cannot access the GPU using VMWare Workstation.

You can however use a Docker image - just check that you have the Docker daemon that can handle NVIDIA installed by running:

henryp@adele: docker info | grep Runtimes 
Runtimes: io.containerd.runc.v2 nvidia runc

or 

henryp@adele:~$ grep -A2 -i nvidia /etc/docker/daemon.json
        "nvidia": {
            "path": "/usr/bin/nvidia-container-runtime",
            "runtimeArgs": []
        }

and download an image from here.

Now, I just need to run something like:

docker run --rm --gpus all  --runtime=nvidia  -it   -v /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/:/home/henryp/Code/Scala/SparkEcosystem/spark-rapids/ -v /home/henryp/Tools:/home/henryp/Tools -v /home/henryp/.m2:/.m2  -v /usr/local/bin/Java:/usr/local/bin/Java  --user root  nvidia/cuda:12.6.1-devel-ubi8 /bin/bash

and once I'm in, run:

yum -y install git
yum -y install diffutils
yum -y install rsync
export MAVEN_HOME=/home/henryp/Tools/Build/Maven/Latest
export PATH=$MAVEN_HOME/bin:$PATH
export JAVA_HOME=/usr/local/bin/Java/Latest17
export PATH=$JAVA_HOME/bin:$PATH

Constantly installing some tools for Rapids to build proved a bit tedious, so I extended the NVIDIA docker image with this Dockerfile:

FROM nvidia/cuda:12.6.1-devel-ubi8

RUN yum -y install git
RUN yum -y install diffutils
RUN yum -y install rsync

Also, if I wanted to use mvnDebug [SO], I had to set the network of the container to host using --network host [SO]. Then, it's just a matter of running:

mvnDebug scalatest:test -DforkMode=never

and attaching a debugger from the host machine.

Unfortunately, sometimes when I close and re-open my laptop, the tests start failing with:

...
WindowedBlockIteratorSuite:
terminate called after throwing an instance of 'cudf::jni::jni_exception'
  what():  CUDA ERROR: code 999
/home/henryp/Tools/Build/Maven/Latest/bin/mvnDebug: line 36:   216 Aborted                 (core dumped) env MAVEN_OPTS="$MAVEN_OPTS" MAVEN_DEBUG_OPTS="$MAVEN_DEBUG_OPTS" "`dirname "$0"`/mvn" "$@"

Apparently, I must reboot my machine :(

Some miscellaneous code notes

Finally, I get to look at the code in action. Rapids adds RapidsExecutorPlugin (which extends the Spark ExecutorPlugin interface) that immediately initializes the GPU and memory (see GpuDeviceManager.initializeGpuAndMemory). Note that in setGpuDeviceAndAcquire we see a comment that says: 
"cudaFree(0) to actually allocate the set device - no process exclusive required since we are relying on Spark to schedule it properly and not give it to multiple executors"
This is why the tests (HashAggregatesSuite) a hard-coded to use just one CPU core.

Rapids then has a parallel set of classes that look a lot like the Spark classes that represent linear algebra structures. For example, there is a ColumnVector abstraction in both Spark and Rapids. The interesting Rapids one is GpuColumVector  - which implements this Spark interface - that can be instantiated by a GpuShuffleExchangeExecBase. Amongst other things, objects of these classes contain the address of their off-heap data and a reference counter.

Still playing.


Saturday, October 5, 2024

Optimising GPU code

I complained to Juan Fumero that a benchmark indicated that the GPU was not giving much of a performance improvement. JMH reported the GPU being a moderate 20% faster than the CPU:

tornado -jar tornado-benchmarks/target/jmhbenchmarks.jar uk.ac.manchester.tornado.benchmarks.sgemv.JMHSgemV
...
Benchmark              Mode  Cnt         Score         Error  Units
JMHSgemV.sgemVJava     avgt    5  72366270.751 ± 5916807.539  ns/op
JMHSgemV.sgemVTornado  avgt    5  57583087.103 ± 2523449.341  ns/op

(SGEMM is single precision general matrix multiplication. GEMV indicates that we're multiplying a matrix with a vector).

Juan replied that I should try TornadoVM's  --enableProfiler console switch and see where the time was being spent. Sure enough, COPY_IN_TIME was ~28ms, about the same as TOTAL_KERNEL_TIME.

Note that the total kernel time is the time it takes the GPU to perform the computation and the total kernel dispatch time is the time it takes to schedule the kernel (ie, the function being executed). In this case, dispatch time is ~6us - three orders of magnitude smaller than the execution time.

Juan also said that "Matrix Vector is not as compute intensive as other applications", so instead I tried the matrix/matrix multiplication. Here, the GPU shines:

Benchmark              Mode  Cnt           Score         Error  Units
JMHSgemm.sgemmJava     avgt    5  1773297262.188 ± 4115731.439  ns/op
JMHSgemm.sgemmTornado  avgt    5     8478409.506 ±  246919.368  ns/op

That makes the GPU 200 times faster than the CPU. Now COPY_IN_TIME is about 1ms and TOTAL_KERNEL_TIME is about 5.5ms.

Now we're talking. But continuing this optimization rampage, it's worth noting that "It has become tribal knowledge that the particular shapes chosen for matmuls has a surprisingly large effect on their performance." [Horace He] TL;DR; He's article explains how fitting the small memory tiles onto a large matrix can hugely change performance - basically, that in a row-major MxN matrix, N must be a factor of the GPU's cache line for best results.

Changes in Java's memory

In the old days, we'd use sun.misc.Unsafe.allocateMemory to use off-heap memory. This code goes straight to the OS and asks for memory via os::realloc. But using Unsafe is bad practise. Not only is it specific to a particular flavout of JVM, it allows access to raw memory. The latter is "fine" if that memory is off-heap but if you are using it to access a Java object, the garbage collector can change its memory location without warning.

There are several modern alternatives. Since Java 9, java.lang.invoke.VarHandle has been the recommended alternative. It provides the same level of low-level access as Unsafe but with better safety and control over memory visibility. That is, its memory access patterns apparently offer finer grained control - eg, volatile access without enforcing strict instruction ordering. 

It's interesting to note that the high performing interoperability framework, Apache Arrow, does not use VarHandle. It still uses Unsafe as VarHandle has bound checking etc that is slower than raw access. 

Since Java 20, we've had Project Panama's Foreign Function & Memory API (JEP-424) spec (it appears Apache Arrow doesn't use it because it's too new). If we run this code:

MemorySegment memorySegment = Arena.global().allocate(1024 * 1024 * 128, 8);         System.out.println(memorySegment.address());

then look for the address while it's still running in /proc/PID/maps (where PID is the ID of the Java process), we can see that the Linux OS now manages a new area of memory. For instance, when I ran it, the output was 0x7fbaccdbe010 and I can see in the maps pseudo file:

7fbaccdbe000-7fbad4dbf000 rw-p 00000000 00:00 0 

This represents the 128 megs of space plus 4096 bytes (presumably a page for meta data).

Note that an Arena in this context is a large chunk of memory that is managed in user space rather than the app code constantly calling the kernel requesting memory piecemeal. This is an optimization.

Now, since Java and C/C++ are IEEE 754 compliant, and now they can pass native memory to each other, you can transparently pass floating point numbers between code bases and run the C/C++ program in the JVM - no more need for JNI! (Interestingly, note that Python is often IEEE754 compliant but it is not guaranteed to be).

It's interesting to note that the GPU enabled Tornado VM uses the java.lang.foreign package to move data to and from the GPU.

Friday, September 27, 2024

Running Iceberg Catalogs in a test suite


When trying to put some BDDs together here (GitHub) for Iceberg and Spark integration, I hit some snags. I was using the local catalog. If I used the Hive catalog, I was getting this error ("Iceberg does not work with Spark's default hive metastore"). 

The reason is that Derby, which is bundled with the Hive metastore in Spark, "doesn't support the interfaces (or concurrent access) required for Iceberg to work" according to Russell Spitzer. He suggests using the Hadoop catalog instead. 

I didn't fancy setting up a more robust Hive metastore that uses a more "professional" database like Postgres so I  followed Russel's advice to use the Hadoop catalog... and immediately hit this problem that's discussed more in Slack. Basically, CALLing via Spark SQL some procedures throws an IllegalArgumentException stating "Cannot use non-v1 table". This is a Spark v1 table, and nothing to do with Iceberg v1 tables, it seems.

OK, so this got me thinking that if an easy way to use Hive and Hadoop catalogs don't work, let's try a REST catalog. Apache Polaris seemed a good candidate. At first, I was hoping to have it run in-process but because of transitive dependencies, it wasn't possible to have everything running easily in the same JVM.

Hmm, so we need it in a separate process. OK, we can Docker-ise that. After adjusting the fixtures, I noticed that while running an erstwhile passing test for a second time leads to a failure. Looking at the stack traces, it appears that sure, we DROP TABLE IF EXISTS ..., but the underlying files are left behind. Russell Spitzer helped me out (again) by pointing out that a PURGE keyword needs to be added to the SQL.

Does the catalog also need to delete metadata files even if the query engine deletes the data files?
Russell Spitzer
This isn't actually defined anywhere. So at the moment we are kind of in a wild west, the Spark implementation deletes absolutely everything. I believe for Trino it just sends the request to the catalog and it's up to the catalog to decide. So for Spark "no", and it explicitly sends a drop table request to the rest catalog without a purge flag.
Russell says Polaris will delete files with this flag [caveat] although there is debate about making this service asynchronous ("I think polaris could own this service too but I know some folks (like netflix) have delete services already"), and "the option to hook into an external delete service".

So, there is some latitude in how one implements a catalog, it seems. This would explain why some of my tests are failing - some assumptions I made are not guaranteed. Still working on making them pass.

Thursday, September 26, 2024

Azure Automation

It's surprising how many silly little jobs can be automated away for big productivity wins. I wrote some Python code to run on a laptop that plays with Azure cloud. The main takeaway points were:

The best way to get access is by using azure.identity.InteractiveBrowserCredential in the azure-identity package. This opens the browser and prompts the user to login. This can then be used to instantiate a azure.storage.filedatalake.DataLakeServiceClient in azure-storage-file-datalake. With this we can upload files.

Then, to kick off ADF jobs, we can use the same credential and pass it to azure.mgmt.datafactory.DataFactoryManagementClient in azure-mgmt-datafactory. As a nice little aside, you can use webbrowser in the web-browser package that will open your browser to the ADF job by crafting a URL with the job ID so you can monitor its progress.

This all seemed pretty straight forward but accessing an Azure SQL instance was somewhat harder. To access the credential, I referred to this SO answer. Here I used the InteractiveBrowserCredential as before but then I needed:

    token = credential.get_token("https://database.windows.net/.default")
    connection_string = f"""
        Driver={Driver};
        Server={server};
        Database={database};
        Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;
    """
    token_bytes = token.token.encode("UTF-16-LE")
    token_struct = struct.pack(f'<I{len(token_bytes)}s', len(token_bytes), token_bytes)
    conn = pyodbc.connect(connection_string, attrs_before = { 1256:token_struct });

which is a bit more involved but allowed me to connect.


Sunday, September 22, 2024

GPU Notes

Minecraft is written in Java and uses JOCL to leverage the GPU[Oracle blog, 2020]
The organization of threads in CUDA terms:
  1. Thread: single unit of execution --- each thread has its own memory called registers
  2. Block: group of threads --- all threads in a block has access to a shared memory [Cuda Terminology].
  3. Grid: group of blocks --- all threads in a grid has access to [mutable] global memory and [immutable, global] constant memory.
 [Penny Xu's blog]
Matrix multiplication is best done using tiles. The reason for this is that when you ask for a value, you actually get an array of them. You could throw them away. Or, you could employ a clever optimization (descibed by Horace He here) where we process the surplus and save for later.

TornadoVM

Oracle's Gary Frost calls [YouTube] TornadoVM as "state of the art" when bringing GPU processing to the JVM.

"There are two ways to express parallelism with TornadoVM" [Juan Fumero] Annotations and "via an explicit parallel kernel API" using KernelContext

Interestingly, by using the Tornado --printKernel --printBytecodes --fullDebug flags, you can get Tornado to print out all sorts of goodies like the raw backend code it has created.

OpenCL Terminology

We can use a poker game to convey the idea of what is going on in OpenCL. Imagine a dealer and players who don’t interact with each other but who make requests to the dealer for additional cards.  

"In this analogy, the dealer represents an Open CL host, each player represents a device, the card table represents a context, and each card represents a kernel. Each player’s hand represents a command queue." [1]

"[Nested] Loops like this are common but inefficient. The inefficiency arises because each iteration requires a separate comparison and addition. Comparisons are time-consuming on the best of processors, but they’re especially slow on dedicated number-crunchers like graphic processor units ( GPU s). GPU s excel at performing the same operations over and over again, but they’re not good at making decisions. If a GPU has to check a condition and branch, it may take hundreds of cycles before it can get back to crunching numbers at full speed.

"One fascinating aspect of Open CL is that you don’t have to configure these loops in your kernel. Instead, your kernel only executes code that would lie inside the innermost loop. We call this individual kernel execution a work-item." [1]

"A work-group is a combination of work-items that access the same processing resources... Work-items in a work-group can access the same block of high-speed memory called local memory. Work-items in a work-group can be synchronized using fences and barriers." [1]

"One of the primary advantages of using Open CL is that you can execute applications using thousands and thousands of threads, called work-items." [1]

"Each block of local memory is specific to the work-items in a work-group." [1]

[1] OpenCL in Action