Monday, September 25, 2023

Spark, Kafka and Docker

I want to run a Spark Structured Streaming application that consumes from a Kafka cluster all within Docker. I've finally got it working [messy code here in my GitHub], but it was not without its own pain.

The biggest problem is getting all the components talking to each other. First, you need a bridge network. "In terms of Docker, a bridge network uses a software bridge which allows containers connected to the same bridge network to communicate, while providing isolation from containers which are not connected to that bridge network." [docs]. Think of it as giving your containers their own namespace.

Secondly, the Spark worker needs to connect to Kafka, the Spark master and the Spark driver. The first two are just a matter of mapping the Spark master and Kafka containers in the worker. What's harder is getting the worker to talk to the driver that may be running on the computer that hosts Docker.

One sign you've got it wrong is if you see "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources" [SO] in your driver logs. This message is a little ambiguous as it may have nothing to do with resources but connectivity. 

Resources seem to be fine

To solve it, you need the driver to define spark.driver.host and spark.driver.port. For the host, we need it to be the magic address of 172.17.0.1. This is the default "IP address of the gateway between the Docker host and the bridge network" [docs]. The port is arbitrary.

[Aside: it's also worth ensuring that the all components are running the exact same version of Spark. I saw a rare error ERROR Inbox: Ignoring error java.lang.AssertionError: assertion failed: CPUs per task should be > 0 and the only thing Google produced was this Bitnami ticket. Ensuring all version were the same made it go away.]

What's more, the worker needs these in its config. You can pass it the host and port with something like SPARK_WORKER_OPTS="-Dspark.driver.host=172.17.0.1 -Dspark.driver.port=SPARK_DRIVER_PORT" in its start up script.

But there is one last gotcha. If still can't get things to work, you might want to login to your worker container and run netstat. If you see the connection to the driver in a state of SYN_SENT, your firewall on the host is probably blocking the connection from the container.

Annoyingly, you probably won't see any error messages being puked from the Driver. It will just hang somwhere near org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:929). I only started seeing error messages when I aligned all version of Spark (see above) and it read: java.io.IOException: Connecting to /172.17.0.1:36909 timed out (120000 ms) 

Looking in that Docker container showed:

bash-5.0# netstat -nap
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name   
...
tcp        0      1 192.168.192.7:53040     172.17.0.1:36909        SYN_SENT    1109/java

and on the host machine where my process is 29006:

(base) henryp@adele:~$ netstat -nap | grep 36909
tcp6       0      0 172.17.0.1:36909        :::*                    LISTEN      29006/java  

Aha, that looks like the problem. It turns out that I have to open the firewall for the block manager too and set a static port for it on the Driver with spark.driver.blockManager.port.

Finally, you should be able to have a Spark master and worker plus Kafka instances all running within Docker along with the driver running on the host using your favourite IDE.

Wednesday, August 30, 2023

Can we apply ML to logging?

Kibana is a Typescript/Javascript product to create visuals of logs. OpenSearch's Dashboards is the Apache licensed fork of this. Kibana is great when you know what you are looking for. But what if you don't?

Example

I have a small Kafka cluster of three nodes using the Raft protocol. I send messages then check a consumer has read all the messages. This integration test passes every time. There are no ERRORs. However, every so often, this test takes over 2 minutes when it should take about 20 seconds.

The number of lines on a good run is 4.5k and on the bad run about 20k. Twenty thousand lines is a lot to go through when you don't know what you're looking for.

I slightly adapted the code here to turn my logs into TF-IDF vectors and used Locality Sensitive Hashing to map them to a lower dimensional space. Now, we can visualise what's going on. 

The good run looks like this:




Note that there are two dominant lines that map to:

[2023-07-04 14:13:10,089] INFO [TransactionCoordinator id=2] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:13:10,089] WARN [TransactionCoordinator id=2] Connection to node 1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

repeated over and over for about 10 seconds.

The bad run looks like this:



Here, the dominant lines in the diagram that are from:

[2023-07-04 14:16:21,755] WARN [TransactionCoordinator id=2] Connection to node 1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] INFO [TransactionCoordinator id=2] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] WARN [TransactionCoordinator id=2] Connection to node 3 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] INFO [TransactionCoordinator id=2] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient)

again being repeated but this time, it lasts for about 2 minutes.

[The code in Ethen Lui's GitHub is really quite clever. Rather than using MinHashing, he's projecting the feature vectors against some randomly generated vectors and making a bit map from it. This can be turned into a single integer which represents the feature's bucket. Note that the number of vectors does not really change the dimensionality of the space but it does change how consistent different runs are - more vectors leads to greater repeatability]

Still at something of a loss, I checked out Bitnami's Kafka instances (here), changed the logging in bitnami/kafka/3.5/debian-11/rootfs/opt/bitnami/scripts/kafka/postunpack.sh by adding the line:

replace_in_file "${KAFKA_CONF_DIR}/log4j.properties" "INFO" "DEBUG"

and built the Docker image again. Now it gives me DEBUG statements.

Fix

The problem of non-determinism is still foxing me but the solution became clear with all these mentions of localhost. We need the client to communicate with the cluster on localhost because the client is unaware that the Kafka instances are hosted in Docker. However, each broker does need to know it's talking to another Docker container as the ports of its peers are not available within its own sandbox. 

The solution was to use slightly different values for the listeners as the advertised listeners (KAFKA_CFG_LISTENERS vs KAFKA_CFG_ADVERTISED_LISTENERS. Note that Bitnami expects environment variables prepended with KAFKA_CFG_ and periods as underscores before it converts them into a Kafka-friendly server.properties file). 

The listeners were of the form OUTSIDE://:9111 while the advertised listeners were of the form OUTSIDE://localhost:9111. The label OUTSIDE apparently is arbitrary. It's just used as a reference, say in listener.security.protocol (in Kafka-speak; munge with the necessary Bitnami mappings to make it appear in server.properties) where you'll see something like OUTSIDE:PLAINTEXT

Conclusion

Although I've fixed the Kafka issue I was facing, applying ML to the logs was only a partial help. I still need to understand the Kafka Raft code better before it can truly be of use.

Thursday, July 6, 2023

Kafka Raft in Docker

These days, you don't need Zookeeper to run a Kafka cluster. Instead, when correctly configured, Kafka uses the Raft algorithm (where "the nodes trust the elected leader"[Wikipedia]) to coordinate itself.

I started to follow Gunnar Morling's blog but it seems his version of Kafka containers have not been updated so I used Bitnami's. However, configuring them to run a Raft cluster proved difficult.

I want to programatically create the cluster rather than use docker-compose as I want greater control over it. So, I wrote this code that talks to Docker via it's API using a Java library. 

Firstly, the Kafka instances couldn't see each other. 

Diagnosing the containers proved difficult as I could not install my favourite Linux tools. When I tried, I was told directory /var/lib/apt/lists/partial is missing. This seems to be deliberate as the Dockerfile explicitly deletes it, to keep images slim. So, I took out that line and added:

RUN apt-get update && apt-get upgrade -y && \
    apt-get clean && apt-get update && \
    apt-get install -y net-tools && \
    apt-get install -y iputils-ping && \
    apt-get install -y  procps && \
    apt-get install -y lsof

then rebuilt the containers. [Aside: use ps -ax to see all the processes in these containers. I was stumped for a while not seeing the Java process that I knew was running].

Using these Linux tools, I could see the containers could not even ping each other. Oops, I need to create a Docker network [SO] and add it to the containers. Now, their logs show that the Kafka containers are at least starting and talking to each other. 

However, the client running on the host machine was puking lots of messages like "Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected". First, I checked [SO] that the Kafka client library and container were both version 3.  But the consensus on the internet appears to be that this error is due to a connection failure. 

Using netstat on the host showed that the host port was indeed open. But this seemed to be due to Docker opening the port to map it to its container but the container not LISTENing on that port. It appears you can tell Kafka on which port to listen with an environment variable that looks like:

KAFKA_CFG_LISTENERS=PLAINTEXT://:$hostPort,CONTROLLER://:$controllerPort

where hostPort is what you want Docker to map and controllerPort corresponds to what is in the KAFKA_CFG_CONTROLLER_QUORUM_VOTERS environment variable.

The next problem was when my client connects, it cannot see the machine called kafka2. What's happening here is that having connected to the bootstrap, the client is asked to contact another machine, in this case something called kafka2

Now, the JVM running on the host knows nothing about a network that is internal to Docker. To solve this, you could have Docker use the host network (which means that everything running on the machine can see each other - fine for testing but a security nightmare). You could subvert the JVM's DNS mappings (rather than faffing around with a DNS proxy) using BurningWave or Java 18's InetAddressResolverProvider. But perhaps the simplest way is configuring Kafka itself to advertise itself as localhost [Confluent] using the KAFKA_CFG_ADVERTISED_LISTENERS environment variable.

And that was it: a Kafka cluster running on my laptop that was reading and writing messages using the Raft algorithm. There are still a few lose ends: why on some runs a node drops out of the cluster non-deterministically even if the functionality was correct as far as the client was concerned. I'll solve that another day.

Sunday, June 25, 2023

Diagnosing K8s Installations

Well, this was much harder than I thought. I tried to update Minikube and my local Kubernetes installation.

First, I installed everything I thought I needed:

sudo apt-get install -y kubelet kubeadm kubectl kubernetes-cni

and then download and install Minikube per the instructions. Unfortunately, when I did, I'd see it complain that possibly "the kubelet is not running".

Running:

systemctl status kubelet

showed that kubelet was exiting with error code 1.

Running:

journalctl -fu kubelet

showed it puking stack traces but this line is the salient one:

... failed to run Kubelet: running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false...

But where do I put it? As it happened, I had to add --fail-swap-on=false to the ExecStart line in /etc/systemd/system/kubelet.service.d/10-kubeadm.conf (I found this by grepping ExecStart in /etc/). You then run:

sudo systemctl daemon-reload

to have changes recognised. Then, it's a matter of configuring Kubernetes system wide:

sudo kubeadm init --ignore-preflight-errors=Swap

(I needed to ignore the fact that my system has Swap space as I'm only working on proof of concepts, not running a K8s cluster in production. Using or not using swap space is nuanced - see here for more information. Basically, what is best depends on the situation. TL;DR; it can speed things up but also delay the inevitable death of a pathological system). 

You can run kubeadm reset if you foul things up.

Also, I had to rm -rf $HOME/.kube and $HOME/.minikube since kubectl config view showed me a profile that was literally years out of data. The .kube config can be regenerated with:

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config


(lines that kubeadm kindly told me to run). Now, running kubectl get all gives me a sensible output.

The story doesn't quite end there. After a few hibernations of my laptop, minikube was failing to start again with (apparently) certificate errors. This issue helped me: I executed minikube delete && minikube start and all was well with the World once more.

I've got a fever, and the only prescription is MOR COW (bell)

Apache Iceberg deals with updates in two different ways:

  • Merge on read
  • Copy on write

What are these strategies? To illustrate, I've some BDD code that writes and updates 20 rows of data using both MOR and COW. (There's an interesting article at Dremio about this topic but some of the code is Dremio specific).

Simply put, copy-on-write replaces the entire parquet file if a single row in updated. 

In merge-on-read, a file containing only the updated data (and a file saying which row was affected) are written. It is the reader that needs to reconcile the data.

The two strategies are for different audiences. COW makes writes slow and reads fast. MOR makes reads slow and writes fast.

Focussing on the more complex of the two strategies, MOR, we see that it creates four new files compared to COW's two. This maps to two different parquet files (as each parquet file in both strategies has a .crc file that holds its metadata). One file contains the updated data, the other a textual reference to the original file containing the original data. No files are deleted and the original parquet file remains the same. No data is redundant.

Iceberg has other means of storing the delete data other than position in the file. It also can check equality. However, this appears to be not supported by Spark at the moment.

(BTW, the title of this post refers to this SNL sketch).

Monday, June 19, 2023

Data Contracts

What they are? "Data Contracts are first and foremost a cultural change toward data-centric collaboration" [Chad Sanderson's blog and here]

Examples of why they're necessary

A company wants to find the average of a consumer for a particular product. They find the age is 42. They are somewhat suprised by this as it's older than they expected so they check their workings - add all the ages and divide by the number of customers who bought it. After confirming the maths, the value is indeed 42 and they report it to their boss. Unfortunately, the mean age was artificially inflated because a subset of customers had an age of '999' because the system that captured that data used it as a placeholder for 'unknown'.

The next example actually happened. We were measuring average length of stay (LoS) in hospitals. When sampling the data, everything looked fine. But out of the millions of patients, a very small number (~30) had a discharge date of 1/1/1900. Clearly, the system that captured that data used this value as a token for 'unknown'. This erroneously reduced the overall LoS. The data bug was only caught when drilling down into individual hospitals, some average LoS figures were negative. Until then, we were merrily reporting the wrong national figure.

Is it a purely cultural problem?

The trouble about cultural solutions is that they depend on unreliable units called "humans". For instance, a friend of mine was the victim of an upstream, breaking change originating in the Zurich office. When he contacted the team, they were unaware of both him, his London team and the fact they were consumers of this data.

I asked on the Apache dev mailing lists if we could implement a more robust, technical solution for Spark. Watch this space for developments.

Possible solutions

Andrew Jones (who is writing a book on data contracts) uses JSON Schema to validate his data. "JSON Schema is a powerful tool for validating the structure of JSON data" [JSON Schema docs]. 

Elliot West of Dreamio (see the mailing list traffic) also favours JSON Schema. However, because JSON has only a few data types (strings, arrays, numericals etc) it's not rich enough to enforce constraints like "date X must be before date Y".

Implementations

This is a new area of development but Databrick's Delta Live Tables (DLT) claims it can “prevent bad data from flowing into tables through validation and integrity checks and avoid data quality errors with predefined error policies (fail, drop, alert or quarantine data).”
Unfortunately, it seems to be Python-only: “Can I use Scala or Java libraries in a Delta Live Tables pipeline? No, Delta Live Tables supports only SQL and Python. You cannot use JVM libraries in a pipeline. Installing JVM libraries will cause unpredictable behavior, and may break with future Delta Live Tables releases.” [docs]

Friday, June 9, 2023

Modern DevOps Tools

Some tools that I've had too little time to investigate thoroughly.

TestContainers
The free and open source TestContainers offers huge convenience to developers. For instance, you can fire up a very lightweight Postgres container in just a second or two. This ZIO SQL test (AgregatingSpec) ran in just 3.85s on my laptop. In that time, it started a Docker container, populated the Postgres database in it with test data, ran some Scala code against it then tore down the container. The container can last as long as the JVM so all your tests can use it before it detects the JVM is exiting whereupon it will kill the container.

MinIO
If you need to run S3 API compatible storage locally, you can try MinIO. It's written in Go and open source and allows you to have a local Docker container emulating Amazon storage. 

DuckDB
This open source, C++ application allows you to run SQL against Parquet files without having to fire up a whole platform. You can even run DBeaver against it.

Crossplane
Crossplane is an open source Go project that "connects your Kubernetes cluster to external, non-Kubernetes resources, and allows platform teams to build custom Kubernetes APIs to consume those resources." [docs]

Scala Native
You can now convert Scala code to stand alone executable binaries using Scala Native [baeldung]. It currently only works with single threaded applications. The output can be converted to WebAssembly...

WebAssembly
Wikipedia describes WebAssembly as "a portable binary-code format and a corresponding text format for executable programs ... for facilitating interactions between such programs and their host environment." It is an "open standard and aims to support any language on any operating system".

Tapir
Is a type-safe, Scala library that documents HTTP endpoints.

GraphQL
GraphQL is a type system, query language, etc accessible through a single endpoint that only returns what is asked of it and no surplus information. It's a spec and there are implementations in a number of languages. The graph bit comes in insofar a "query is a path in the graph, going from the root type to its subtypes until we reach scalar types with no subfields." [Bogdan Nedelcu]

LLVM
LLVM is an open source tool chain written in C++. The 'VM' in LLVM originally stood for Virtual Machine but these days but this is no longer the case. Instead of being a virtual machine, it turns any major language into a common intermediate code that can then be turned to machine code. 

GraalVM
GraalVM is an open source JDK and JRE written in Java itself and has its roots in project Maxine. But it's more than that. It offers compilation to native code as well as supporting polyglot code via its Truffle framework, a language-agnostic AST.

Quarkus
Based on GraalVM (above), Quarkus is an open source Java Framework tailored for Kubernetes. Since the JVM code is natively compiled, startup and memory sizes are small.

Spring Boot
Is an "opinionated" Java framework that favours convention-over-configuration and runs Spring apps with the minimum of fuss.

Python/Java Interop
Together, Python and Java both dominate the data engineering landscape. These languages can interoperate via Py4J which uses sockets to allow Python to invoke Java code and Jython which runs Python code wholely inside the JVM. Py4J is used extensively in Spark to allow PySpark devs to talk to Spark JVMs.
Jython, unfortunately, does not support Python 3.

Project Nessie
Nessie is an open source, JVM project that promises to do to big data what Git did to code: versioning, branching etc. It apparently sits nicely on top of Iceberg and DataBricks.

The lakeFS project is a open source, Go project that offers similar functionality.

Cloud native CI/CD
Tekton that is written in GoLang.
Argo is a Python based, Kubernetes native tool. For instance, it handles rolling deployments building on K8's RollingUpdate strategy which does not natively control traffic flow during an update. 
CircleCI seems to be mostly closed source.

Pipelines
Interestingly, CI/CD and data pipelines both use directed acycliclic graphs but with very different intent. User Han on Discord eloquently spelled out the architectural distinction:

Specifically the reason is in batch data [Pipeline] processing, we tend to scale things out horizontally by a whole lot, sometimes using GPUs. This is not a common feature supported by CI/CD workflow tools. In summary: 

Jenkins, CodePipeline, Github Actions, TeamCity, Argo, etc ==> used to build DAGs for CI/CD, tends to have shorter run time, less compute requirement, and fairly linear in dependencies.

Airflow, Dagster, Prefect, Flyte, etc ==> used to build data and/or machine learning pipelines. It tend to have longer run time, larger horizontal scaling needs, and sometimes complex dependencies.  Data pipelines also sometimes have certain needs, e.g., backfilling, resume, rerun, parameterization, etc that's not common in CI/CD pipelines

Interestingly, Luigi looks like it's dead as even its creator says on Twitter.

Istio
Istio is an open source GoLang project that transparently provides "a uniform way to integrate microservices, manage traffic flow across microservices, enforce policies and aggregate telemetry data."

LocalStack
... is a (mostly) open source emulator of some major AWS services (you pay for the more niche APIs).

K3s
Is a lightweight edition of Kubernetes that can run on single edge devices and development environments.