Wednesday, October 25, 2023

Java and the GPU

Java was always built to abstract away the hardware on which it runs but it's approach to GPU has been somewhat late to the game [Gary Frost, YouTube].

There are projects out there that promise to give Java access to the GPU. I looked at Aparapi but it appears to be moribud. So, I gravitated to TornadoVM which Frost describes as "state of the art".

The trouble is that TornadoVM runs everything in a Docker image that has all the shared objects built in. This is fine for a quick demo - this is the result of running on my Quadro T2000:

docker-tornadovm$ ./run_nvidia_openjdk.sh tornado -cp example/target/example-1.0-SNAPSHOT.jar example.MatrixMultiplication
Computing MxM of 512x512
CPU Execution: 1.17 GFlops, Total time = 230 ms
GPU Execution: 268.44 GFlops, Total Time = 1 ms
Speedup: 230x

This demonstrates how the GPU runs a nested for-loop doing matrix multiplication much faster than the same code on the CPU. But it runs it all in a Docker container and I need to package a JAR everytime I make a change. How do I run it outside the container?

To work this out, I opened a shell in the Docker image and saw that the TornadoVM build it uses was built from Git branch d3062accc. So, the first thing was to checkout that branch of TornadoVM and build it.

I built with:

mvn clean install -Pgraal-jdk-11-plus

using the graalvm-ee-java11-22.3.4 JDK.

Note that you'll need Graal as the TornadoVM code has dependencies on it. I built my own Graal JDK by following the instructions here but using a different branch as I couldn't find the download for the graal.version defined in the TornadoVM pom.xml. Note, you'll also need mx and a bootstrapping JDK that has the right compiler interface (JVMCI), in my case labsjdk-ce-21.0.1-jvmci-23.1-b19.

So far, so good. I ran the tornado script which is just a wrapper around a call to the java executable (don't forget to set your JAVA_HOME environment variable to point at the Graal JDK) but it complained it could not see a tornado.backend file.

Again, a sneaky look at the Docker container indicated that we have to tell it which driver to use. So, I created the file and told it tornado.backends=opencl-backend but then tornado complained it didn't have the OpenCL drivers. Oops. 

You have to build the drivers you want seperately it seems. But if you try to build Tornado drivers without the native OpenCL dev library, you'll see:

TornadoVM/tornado-drivers/opencl-jni$ mvn clean install # yes, Maven cmake via cmake-maven-plugin
....
/usr/bin/ld: cannot find -lOpenCL
...


The Docker image saves you from having to install the OpenCL libraries on your machine. To get it working on bare metal, I played it safe and got an old Ubuntu box and installed them there. You'll need to install them with:

sudo apt install ocl-icd-opencl-dev

and then ran Maven in the opencl* sub directories. This time, the Maven build completed successfully.

However, running tornado in the subsequent dist folder still pukes but with something like:

Caused by: uk.ac.manchester.tornado.api.exceptions.TornadoRuntimeException: OpenCL JNI Library not found
at tornado.drivers.opencl@0.15.1/uk.ac.manchester.tornado.drivers.opencl.OpenCL.<clinit>(OpenCL.java:68)
... 11 more

Not what I was expecting. I found I needed to:

cp ./tornado-drivers/opencl-jni/target/linux-amd64-release/cmake/libtornado-opencl.so $TORNADO_SDK/lib

Where TORNADO_SDK is pointing at the relevent dist folder.

Now, finally, you can run on the bare metal:

$ tornado -cp target/classes/ example.MatrixMultiplication
Computing MxM of 512x512
CPU Execution: 1.21 GFlops, Total time = 222 ms
GPU Execution: 17.90 GFlops, Total Time = 15 ms
Speedup: 14x

(Results from an old NVIDIA GeForce GTX 650)

Note, you'll need to also run it with the Graal JVM. Set both the PATH and JAVA_HOME environment variables to point to it.

Where now?

This is a nice introduction to running Java on the GPU but it's just the start. There are many caveats. Example: what if your Java code throws an Exception? GPUs have no equivalent of exceptions so what happens then? More to come.

Thursday, October 12, 2023

Dependency hell

In these days of ChatGPT, it's easy to forget that most of the time, a developer isn't actually cutting code at all, but debugging it. This is my own personal hell in getting Spark and Kafka in Docker containers talking to a driver on the host.

Firstly, I was seeing No TypeTag available when my code was trying to use the Spark Encoders. This SO answer helped. Basically, my code is Scala 3 and "Encoders.product[classa] is a Scala 2 thing. This method accepts an implicit TypeTag. There are no TypeTags in Scala 3". Yikes. This is probably one reason the upgrade path in Spark to Scala 3 is proving difficult. The solution I used was to create a SBT sub Project that was entirely Scala 2 and from here I called Spark.

The next problem was seeing my Spark jobs fail with:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (172.30.0.7 executor 0): java.lang.ClassCastException: cannot assign instance of scala.collection.generic.DefaultSerializationProxy to field org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions of type scala.collection.immutable.Seq in instance of org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition

This is a contender for the error message with the greatest misdirection. You think it's a serialization problem but it isn't directly so.

Although other Spark users have reported it, Ryan Blue mentions that it isn't really a Spark issue but a Scala issue.

Anyway, I tried all sorts of things like change my JDK (note the sun.* packages have been removed in later JDKs so you need to follow the advice in this SO answer). I tried creating an uber jar but was thwarted by duped dependencies [SO], Invalid signature file digest errors as some were signed [SO] that forced me to strip the signtures out [SO] but still falling foul of Kafka's DataSourceRegister file being stripped out [SO].

The first step in the right direction came from here and another SO question where the SparkSession is recommended to be built by adding .config("spark.jars", PATHS) where PATHS is a comma delimited string of the full paths of all the JARs you want to use. Surprisingly, this turned out to include Spark JARs themselves, including in my case spark-sql-kafka-0-10_2.13 which oddly does not come as part of the Spark installation. By adding them as spark.jars, they are uploaded into the work subdirectory of a Spark node.

After this, there was just some minor domain name mapping issues to clear up in both the host and container before the whole stack worked without any further errors being puked.

Monday, October 9, 2023

My "What data science can learn from software engineering" presentation

Dr Chris Monit and I presented this presentation at the London MLOps meetup last week. TL;DR: maximize your chances of a successful delivery in data science by adopting best practices that the software industry has established.

"Think of MLOps as the process of automating machine learning using DevOps methodologies" - Practical MLOps (O'Reilly)

Monday, October 2, 2023

Packaging Python

Python build tools are unifying behind a common interface of pyproject.toml.
This and this are great guides. The gist of the former is that you create a TOML file that conforms to a specification then you can use any build tool to run it. The gist of the latter is the whole Python packaging ecosystem.

The salient commands for building and deploying with your TOML file are:

python3 -m build
python3 -m twine upload --repository pypi dist/*


Note, you want to clean your dist directory first.

The Snag

The idea of using any Python build tool is not quite there yet. Poetry only implements a subset of the specification. Also, the specification has a leaky abstraction. On Discord, Prof. Nick Radcliffe explains that the promise of using "any" lead him to naively use setuptools.

Nick Radcliffe — 08/21/2023 2:37 PM

Also, in case anyone is interested (related to packaging, above) I'm currently in the process of packaging a fairly large Python codebase using new-style packaging (pyproject.toml rather than setup.py). It wasn't quite my first use of it, but this project is much more complex. Initially, I chose setuptools as the build backend, since (a) it didn't seem like it should matter much and (b) I didn't think I needed anything special. That was a big mistake for me: it turns out the setuptools back-end ignores almost everything except Python code in building your package. Whereas my package (which has over 10k files) also have about 1,000 non-python files (everything from .txt and .json to shape files, CSV files, and HTML and markdown and all sorts). Some of these are needed for testing (which for some reason some people think don't need to be distributed...as if people shouldn't care about whether the installed software works in situ, rather than just on the developer's machine in the CI system), but others are needed just in the ordinary course of using the software.  setuptools has a way to let you include extra stuff, but it's very manual and would be very error-prone for me. Anyway, the TL;DR is that I switched to Flit as the backend and everything "just worked". Not saying Flit will work better for you; but it sure as hell worked better for me!

Also, the reason I chose flit was that the third bullet in "Why use Flit?" is "Data files within a package directory are automatically included. Missing data files has been a common packaging mistake with other tools."

It also says: "The version number is taken from your package’s version attribute, so that always matches the version that tools like pip see." Which also seems extremely sane (and probably I don't need to do the automatic updating of my pyproject.toml to do that.

Success has many parents...

... but it appears that PyPI packages have only one. Although the authors tag can take a list, adding multiple entries is ignored. The reason is that it's best practise to use a mailing list (see here).

And so my package to facilitate the creation of synthetic data now lives in PyPI much like my Java code is deployed to mvnrepository.

Monday, September 25, 2023

Spark, Kafka and Docker

I want to run a Spark Structured Streaming application that consumes from a Kafka cluster all within Docker. I've finally got it working [messy code here in my GitHub], but it was not without its own pain.

The biggest problem is getting all the components talking to each other. First, you need a bridge network. "In terms of Docker, a bridge network uses a software bridge which allows containers connected to the same bridge network to communicate, while providing isolation from containers which are not connected to that bridge network." [docs]. Think of it as giving your containers their own namespace.

Secondly, the Spark worker needs to connect to Kafka, the Spark master and the Spark driver. The first two are just a matter of mapping the Spark master and Kafka containers in the worker. What's harder is getting the worker to talk to the driver that may be running on the computer that hosts Docker.

One sign you've got it wrong is if you see "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources" [SO] in your driver logs. This message is a little ambiguous as it may have nothing to do with resources but connectivity. 

Resources seem to be fine

To solve it, you need the driver to define spark.driver.host and spark.driver.port. For the host, we need it to be the magic address of 172.17.0.1. This is the default "IP address of the gateway between the Docker host and the bridge network" [docs]. The port is arbitrary.

[Aside: it's also worth ensuring that the all components are running the exact same version of Spark. I saw a rare error ERROR Inbox: Ignoring error java.lang.AssertionError: assertion failed: CPUs per task should be > 0 and the only thing Google produced was this Bitnami ticket. Ensuring all version were the same made it go away.]

What's more, the worker needs these in its config. You can pass it the host and port with something like SPARK_WORKER_OPTS="-Dspark.driver.host=172.17.0.1 -Dspark.driver.port=SPARK_DRIVER_PORT" in its start up script.

But there is one last gotcha. If still can't get things to work, you might want to login to your worker container and run netstat. If you see the connection to the driver in a state of SYN_SENT, your firewall on the host is probably blocking the connection from the container.

Annoyingly, you probably won't see any error messages being puked from the Driver. It will just hang somwhere near org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:929). I only started seeing error messages when I aligned all version of Spark (see above) and it read: java.io.IOException: Connecting to /172.17.0.1:36909 timed out (120000 ms) 

Looking in that Docker container showed:

bash-5.0# netstat -nap
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name   
...
tcp        0      1 192.168.192.7:53040     172.17.0.1:36909        SYN_SENT    1109/java

and on the host machine where my process is 29006:

(base) henryp@adele:~$ netstat -nap | grep 36909
tcp6       0      0 172.17.0.1:36909        :::*                    LISTEN      29006/java  

Aha, that looks like the problem. It turns out that I have to open the firewall for the block manager too and set a static port for it on the Driver with spark.driver.blockManager.port.

Finally, you should be able to have a Spark master and worker plus Kafka instances all running within Docker along with the driver running on the host using your favourite IDE.

Wednesday, August 30, 2023

Can we apply ML to logging?

Kibana is a Typescript/Javascript product to create visuals of logs. OpenSearch's Dashboards is the Apache licensed fork of this. Kibana is great when you know what you are looking for. But what if you don't?

Example

I have a small Kafka cluster of three nodes using the Raft protocol. I send messages then check a consumer has read all the messages. This integration test passes every time. There are no ERRORs. However, every so often, this test takes over 2 minutes when it should take about 20 seconds.

The number of lines on a good run is 4.5k and on the bad run about 20k. Twenty thousand lines is a lot to go through when you don't know what you're looking for.

I slightly adapted the code here to turn my logs into TF-IDF vectors and used Locality Sensitive Hashing to map them to a lower dimensional space. Now, we can visualise what's going on. 

The good run looks like this:




Note that there are two dominant lines that map to:

[2023-07-04 14:13:10,089] INFO [TransactionCoordinator id=2] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:13:10,089] WARN [TransactionCoordinator id=2] Connection to node 1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

repeated over and over for about 10 seconds.

The bad run looks like this:



Here, the dominant lines in the diagram that are from:

[2023-07-04 14:16:21,755] WARN [TransactionCoordinator id=2] Connection to node 1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] INFO [TransactionCoordinator id=2] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] WARN [TransactionCoordinator id=2] Connection to node 3 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] INFO [TransactionCoordinator id=2] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient)

again being repeated but this time, it lasts for about 2 minutes.

[The code in Ethen Lui's GitHub is really quite clever. Rather than using MinHashing, he's projecting the feature vectors against some randomly generated vectors and making a bit map from it. This can be turned into a single integer which represents the feature's bucket. Note that the number of vectors does not really change the dimensionality of the space but it does change how consistent different runs are - more vectors leads to greater repeatability]

Still at something of a loss, I checked out Bitnami's Kafka instances (here), changed the logging in bitnami/kafka/3.5/debian-11/rootfs/opt/bitnami/scripts/kafka/postunpack.sh by adding the line:

replace_in_file "${KAFKA_CONF_DIR}/log4j.properties" "INFO" "DEBUG"

and built the Docker image again. Now it gives me DEBUG statements.

Fix

The problem of non-determinism is still foxing me but the solution became clear with all these mentions of localhost. We need the client to communicate with the cluster on localhost because the client is unaware that the Kafka instances are hosted in Docker. However, each broker does need to know it's talking to another Docker container as the ports of its peers are not available within its own sandbox. 

The solution was to use slightly different values for the listeners as the advertised listeners (KAFKA_CFG_LISTENERS vs KAFKA_CFG_ADVERTISED_LISTENERS. Note that Bitnami expects environment variables prepended with KAFKA_CFG_ and periods as underscores before it converts them into a Kafka-friendly server.properties file). 

The listeners were of the form OUTSIDE://:9111 while the advertised listeners were of the form OUTSIDE://localhost:9111. The label OUTSIDE apparently is arbitrary. It's just used as a reference, say in listener.security.protocol (in Kafka-speak; munge with the necessary Bitnami mappings to make it appear in server.properties) where you'll see something like OUTSIDE:PLAINTEXT

Conclusion

Although I've fixed the Kafka issue I was facing, applying ML to the logs was only a partial help. I still need to understand the Kafka Raft code better before it can truly be of use.

Thursday, July 6, 2023

Kafka Raft in Docker

These days, you don't need Zookeeper to run a Kafka cluster. Instead, when correctly configured, Kafka uses the Raft algorithm (where "the nodes trust the elected leader"[Wikipedia]) to coordinate itself.

I started to follow Gunnar Morling's blog but it seems his version of Kafka containers have not been updated so I used Bitnami's. However, configuring them to run a Raft cluster proved difficult.

I want to programatically create the cluster rather than use docker-compose as I want greater control over it. So, I wrote this code that talks to Docker via it's API using a Java library. 

Firstly, the Kafka instances couldn't see each other. 

Diagnosing the containers proved difficult as I could not install my favourite Linux tools. When I tried, I was told directory /var/lib/apt/lists/partial is missing. This seems to be deliberate as the Dockerfile explicitly deletes it, to keep images slim. So, I took out that line and added:

RUN apt-get update && apt-get upgrade -y && \
    apt-get clean && apt-get update && \
    apt-get install -y net-tools && \
    apt-get install -y iputils-ping && \
    apt-get install -y  procps && \
    apt-get install -y lsof

then rebuilt the containers. [Aside: use ps -ax to see all the processes in these containers. I was stumped for a while not seeing the Java process that I knew was running].

Using these Linux tools, I could see the containers could not even ping each other. Oops, I need to create a Docker network [SO] and add it to the containers. Now, their logs show that the Kafka containers are at least starting and talking to each other. 

However, the client running on the host machine was puking lots of messages like "Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected". First, I checked [SO] that the Kafka client library and container were both version 3.  But the consensus on the internet appears to be that this error is due to a connection failure. 

Using netstat on the host showed that the host port was indeed open. But this seemed to be due to Docker opening the port to map it to its container but the container not LISTENing on that port. It appears you can tell Kafka on which port to listen with an environment variable that looks like:

KAFKA_CFG_LISTENERS=PLAINTEXT://:$hostPort,CONTROLLER://:$controllerPort

where hostPort is what you want Docker to map and controllerPort corresponds to what is in the KAFKA_CFG_CONTROLLER_QUORUM_VOTERS environment variable.

The next problem was when my client connects, it cannot see the machine called kafka2. What's happening here is that having connected to the bootstrap, the client is asked to contact another machine, in this case something called kafka2

Now, the JVM running on the host knows nothing about a network that is internal to Docker. To solve this, you could have Docker use the host network (which means that everything running on the machine can see each other - fine for testing but a security nightmare). You could subvert the JVM's DNS mappings (rather than faffing around with a DNS proxy) using BurningWave or Java 18's InetAddressResolverProvider. But perhaps the simplest way is configuring Kafka itself to advertise itself as localhost [Confluent] using the KAFKA_CFG_ADVERTISED_LISTENERS environment variable.

And that was it: a Kafka cluster running on my laptop that was reading and writing messages using the Raft algorithm. There are still a few lose ends: why on some runs a node drops out of the cluster non-deterministically even if the functionality was correct as far as the client was concerned. I'll solve that another day.