Saturday, May 17, 2025

DevOps lessons from Polaris and Iceberg

The Apache Iceberg and Polaris code bases make pretty good reading. There's some nice DevOps work here. Here are a few tidbits.

Iceberg's Kafka Connectors

As part of Iceberg's integration tests, Docker compose is used to fire up a Kafka Connect container. Interestingly, this container mounts the directory holding Iceberg's artifacts so it instantly has the latest implementation of Iceberg's Kafka Connect's SinkConnector and SinkTask. The test suite then starts the connector with a REST call (that's the nature of Kafka Connect) that contains all of the connector's config.

This docker-compose.yml also starts a MinIO container so Kafka Connects thinks it's writing to an AWS S3 bucket - all of this in on one self-contained laptop, no config required. (It uses TestContainers to do the heavy lifting of talking to the Docker daemon).

TestContainers has a very cool trick for cleaning up the containers when the tests are finished. It starts a sidecar called Ryuk that listens on a port connected to the JVM and kills the containers when that connection closes. You can see it while the tests are running with:

$ docker ps
CONTAINER ID   IMAGE                                 COMMAND                  CREATED         STATUS                            PORTS                                                           NAMES
...
f7c69b891330   testcontainers/ryuk:0.11.0            "/bin/ryuk"              5 seconds ago   Up 5 seconds                      0.0.0.0:32772->8080/tcp, :::32772->8080/tcp                     testcontainers-ryuk-3c88ed17-ec3d-4ce9-8830-dbbc1ca86294

Kind - K8s lite

You can run Polaris in a Kind cluster. Kind is Kubernetes in Docker. Although it is not CNCF compliant itself, it runs a CNCF compliant version of Kubernetes, just inside Docker containers. So, although it is not appropriate for a multi-node environment, Kind is great for development on a single machine.

Kind starts two Docker containers (kind-control-plane and kind-registry) as well as updating your ~/.kube/config file. If you ./run.sh in the Polaris codebase, you will see it start Kubernetes pods like etcdcore-dns and kube-proxy as well as a polaris container.

Metadata file

Iceberg creates a file called iceberg-build.properties when it's built. Having your project do this can be enormously useful when you've ssh-ed into a box and wondering exactly what version is running there because it's a test environment and nobody has keeping tracking of what is deployed where (ie, the real world). 

Iceberg is built with Gradle so uses the com.gorylenko.gradle-git-properties plugin but there appears to be an equivalent for Maven (git-commit-id-plugin).

Quarkus

Polaris has become heavily dependent on Quarkus. The Docker container just runs quarkus-run.jar. This Jar's main class is io.quarkus.bootstrap.runner.QuarkusEntryPoint. This loads /deployments/quarkus/quarkus-application.dat, a binary file that loads all the Polaris guff. Apparently, it's a binary file to minimise start up times.

The Docker image's entrypoint is /opt/jboss/container/java/run/run-java.sh. This script comes FROM the Dockerfile's Quarkus-friendly base image and contains sensible JVM defaults.

Wednesday, May 14, 2025

Interpreting XGBoost models

Playing with XGBoost at the moment. Here are some basic notes:

Scikitlearn's XGBoost Classifier

This takes a lot of parameters but the highlights are:

Also called eta, the learning_rate is similar to neural nets in that a smaller value means to slower but potentially better performance during training.

At what value of minimum loss before splitting a leaf is called gamma. A higher value means a more conservative model.

You can define the loss function with eval_metric. A value of logloss for instance will penalize confidently wrong predictions.

The fraction of the training data used per tree is the subsample argument and the scale_pos_weight weights the positive classes (useful for imbalanced data).

Random Stratified Data

Talking of imbalanced data, if there are two classes, A and B, and the number of data points for A is small, a random sample may randomly have a disproportionate number of As. To address this, we separate the A and B data points and take the exact number from each such that the ratio overall fits the real world.

For instance, if the ratio of A to B is 1:100 and we want a sample of 1000 points, using Random Stratified Data will give us precisely 10 As. Whereas a random sample could (feasibly) give us none.

Notes on the Confusion Matrix

High precision and low recall indicates a model that really wants to be sure it doesn't have too many false positives.

F1 scores are good for checking imbalanced data or false positives and negatives are of the same value. It's domain specific but a value of about 0.6 is a minium for being acceptable.

Specificity is how well a binary classifier spots false negatives. It's between 0 and 1 and higher is better. Sensitivity is the equivalent for false positives.

The Bayes factor is the ratio of the probability of getting this data given a hypothesis is true versus the null hypothesis, that is P(D|H1)/P(D|H0).

When less (data) is more

In survival analysis, you might want to employ a technique called temporaral censoring analysis. This applies to time-to-event data, the subject of survival analysis. The idea is that you censor data to minimise a fundamental problem: a person who yet to be diagnosed with a disease (for example) is classed the same as somebody who will never have it.

A similar approach is used to tackle Simpson's Paradox where the model behaves differently in aggregate than when the data is segmented. Here, we segment the data and evaluate the model on those cohorts only. These techniques are called cohort-based validation or segmented model evaluation depending on how you slice and dice the data.

Calibration

If you were to plot the probabilities of your model (from 0 to 1) against the actual fraction it of class A vs class B at that probability, you'd have a calibration curve. There are a few variants on the methodology (eg, Platt Scaling that treats this as a logisitic regression) but the simpler one is an isotonic method. This sorts your probabilities in ascending order while calculating the cumumlative total of class As monotonically isotonically increasing step function.

A line that hugs the diagonal indicates a well calibrated model.

Wednesday, April 16, 2025

Kafka in Kubernetes

If you ask Strimzi to set up a Kafka cluster out of the box, you'll see this when connecting:

$ kafka-topics.sh --bootstrap-server 10.152.183.163:9092 --list
...
[2025-04-04 16:35:59,464] WARN [AdminClient clientId=adminclient-1] Error connecting to node my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc
...

Where that IP address comes from:

$ kubectl get service -A
NAMESPACE     NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE...
kafka         my-cluster-kafka-bootstrap            ClusterIP   10.152.183.163   <none>        9091/TCP,9092/TCP,9093/TCP                     21d

It appears that Strimzi does not expose external Kafka ports by default. So, add an external port with:

kubectl get kafka my-cluster -n kafka -o yaml > /tmp/kafka.yaml

then edit /tmp/kafka.yaml adding:

    - name: external
      port: 32092
      tls: false
      type: nodeport

in the spec/kafka/listeners block and apply it with:

kubectl apply -n kafka -f /tmp/kafka.yaml

Now I can see:

$ kubectl get svc -n kafka
NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE   SELECTOR
...
my-cluster-kafka-bootstrap            ClusterIP   10.152.183.163   <none>        9091/TCP,9092/TCP,9093/TCP                     21d
my-cluster-kafka-external-bootstrap   NodePort    10.152.183.27    <none>        32092:32039/TCP                                11d

It appears that Strimzi has created a new service for us - hurrah! 

However, making a call to Kafka still fails. And this is because of the very architecture of Kubernetes. I am indeed communicating with a Kafka broker within Kubernetes but then it's forwarding me to another domain name, my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc. The host knows nothing about this Kubernetes domain name. Incidentally, the same happens in Kafka for a pure Docker configuration.

Kubernetes pods resolve their domain names using the internal DNS.

$ kubectl exec -it my-cluster-dual-role-1 -n kafka -- cat /etc/resolv.conf
Defaulted container "kafka" out of: kafka, kafka-init (init)
search kafka.svc.cluster.local svc.cluster.local cluster.local home
nameserver 10.152.183.10
options ndots:5

This nameserver is kube-dns (I'm using Microk8s):

$ kubectl get svc -n kube-system
NAME       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                  AGE
kube-dns   ClusterIP   10.152.183.10   <none>        53/UDP,53/TCP,9153/TCP   21d

and we can query it from the host:

$ nslookup my-cluster-dual-role-external-0.kafka.svc.cluster.local 10.152.183.10
Server: 10.152.183.10
Address: 10.152.183.10#53

Name: my-cluster-dual-role-external-0.kafka.svc.cluster.local
Address: 10.152.183.17

Now, to get the host to use the Kubernetes DNS for K8s domain names, I had to:

$ sudo apt update
$ sudo apt install dnsmasq
$ sudo vi /etc/dnsmasq.d/k8s.conf

This was a new file and needed:

# Don't clash with systemd-resolved which listens on loopback address 127.0.0.53:
listen-address=127.0.0.1
bind-interfaces
# Rewrite .svc to .svc.cluster.local
address=/.svc/10.152.183.10
server=/svc.cluster.local/10.152.183.10

That listen-address line was because sudo ss -ulpn | grep :53 showed both dnsmasq and systemd-resolved were fighting over the same port.

I also had to add:

[Resolve]
DNS=127.0.0.1
FallbackDNS=8.8.8.8
Domains=~svc.cluster.local

to /etc/systemd/resolved.conf to tell it to defer to dnsMasq first for domains ending with svc.cluster.local. Finally, restarting 

$ sudo systemctl restart systemd-resolved
$ sudo ln -sf /run/systemd/resolve/resolv.conf /etc/resolv.conf
$ sudo systemctl restart dnsmasq

Now let's use that external port we configured at the top of the post:

$ kubectl get svc -n kafka
NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE
...
my-cluster-kafka-external-bootstrap   NodePort    10.152.183.27    <none>        32092:32039/TCP          
$ ./kafka-topics.sh --bootstrap-server 10.152.183.27:32092 --create --topic my-new-topic  --partitions 3  --replication-factor 2
Created topic my-new-topic.
$ ./kafka-topics.sh --bootstrap-server 10.152.183.27:32092 --list
my-new-topic

Banzai!

Tuesday, April 15, 2025

What is a (local) LLM?

The short answer: just a bunch of matrices and config files.

The long answer: if you download an LLM from, say, HuggingFace, you'll see a directory structure a little like this:

$ tree ~/.cache/huggingface/hub/models--mlabonne--TwinLlama-3.1-8B/
├── refs
│   └── main
└── snapshots
    └── 5b8c30c51e0641d36a2f9007c88425c93db0cb07
        ├── config.json
        ├── generation_config.json
        ├── model-00001-of-00004.safetensors
        ├── model-00002-of-00004.safetensors
        ├── model-00003-of-00004.safetensors
        ├── model-00004-of-00004.safetensors
        ├── model.safetensors.index.json
        ├── special_tokens_map.json
        ├── tokenizer_config.json
        └── tokenizer.json

[slightly modified for clarity].

You can see the actual matrices with something like:

from transformers import AutoModel, AutoTokenizer
import torch

model = AutoModel.from_pretrained("mlabonne/TwinLlama-3.1-8B")
embedding_layer = model.get_input_embeddings()  # This contains token ID → vector mappings
embedding_weights = embedding_layer.weight  # Shape: (vocab_size, embedding_dim)
print(embedding_weights.size())

Output:

torch.Size([128256, 4096])

But if you want to get even more raw, you can just read the files directly:

from safetensors.torch import load_file

state_dict = load_file("model-00001-of-00004.safetensors")
embeddings = state_dict['model.embed_tokens.weight']
print(embeddings.size())

Output:

torch.Size([128256, 4096])

That is the size of the vocabulary and we can demonstrate this with:

$ grep -rl 128256 ~/.cache/huggingface/hub/models--mlabonne--TwinLlama-3.1-8B/
...
config.json:  "vocab_size": 128256 

You can see what the actual tokens (note: not words) are in tokenizer.json.

File formats

Safe Tensors are a HuggingFace format for storing tensors. The "safe" properties come from the possible security implications of PyTorch's .pt and .bin formats that allow the pickling of code in the files.

GGUF (Giant GGML Unified Format) is a format used by Llama (and compatible) models. The format stores everything (both weights and metadata) in one file, is good for CPU processing and allows quantization.

You can convert from Safe Tensors to GGUF by running something like this from the llama.cpp repo:

python  convert_hf_to_gguf.py --outtype f16  --outfile /home/henryp/Temp/models--mlabonne--TwinLlama-3.1-8B-DPO.gguf /home/henryp/.cache/huggingface/hub/models--mlabonne--TwinLlama-3.1-8B-DPO/snapshots/4a76d14414118c00fbbaed96cf1dd313a10f1409/

That directory being where the model's config.json lives.

Tensor files store their weights in various formats. FP16 and BF16 are both 16-bit encodings with the difference being which bits they allocate to the mantissa and exponent. BF16 has a larger range (same as FP32) but lower precision. FP16 is the traditional format for GPUs with more modern GPUs now also supporting BF16. 

You can even just use integers like INT8 and Q4_0, the latter being a 4-bit integer with the _0 refering to which technique was used to quantize it (eg, scaling). These different strategies trade off accuracy, speed, complexity etc.

The model

There's a nice Java implementation of Llama.cpp here. It's very nicely written and even allows you to compile the Java code to a native binary (use the latest GraalVM). This class represents the weights that are loaded from disk. In the case of a LLama model, the GGUF file is read from disk, parsed and just becomes an object this class.

Monday, April 14, 2025

Interpreting GLMs

How do you check the quality of your Generalized Linear Models?

You cannot use ROC and AUC curves for linear regression models by their very nature (they don't give yes/no answers). So one way is to use the Concordance Index (a.k.a. C-Index) instead. It works by taking pairs of observations where one has experienced the outcome and the other didn't and asking how the model did. With the results, you want a value greater than 0.5 but how much greater is (as always) context-dependent. In the social sciences, 0.75 may be great but pants in the hard sciences.

VIF indicates how much the collinearity of factors impact each other. It's quite domain specific but a value of less than 10 is considered moderate for prediction and less than 5 for inference.

Because of the mathematics behind them, the R-squared used in Ordinary Least Squares does not apply to a general GLM. Instead, Pseudo R-squared is used that tends to have a lower value. It is not recommended to use this metric for goodness of fit in GLMs.

Poisson and Binomial commonly have high deviance. Lower deviance indicates a better fit. The ratio of this to the degrees of freedom (the number of observations minus the number of parameters) are better for assessing the goodness of fit. Values moderately above 1.0 indicate overdispersion where the variance in the data is greater than what the model assumes. This matters less for Negative Binomial. Underdispersion indicates the data has clusters.

The alpha on a negative binomial should be about 1.0. This is the default in StatsModels. More than that indicates overdispersion.

Another metric is the ratio of Pearson chi squared to residual degrees of freedom. This should typically be from 0.5-0.8 to 1.2-2.0 depending on your tolerance. Again, high values indicate overdispersion, low underdispersion.

Chi-squared can tell you there is a relationship between categorical variables but it doesn't tell you its strength. For that use Cramer's V. This gives you a value between 0.0 and 1.0. Interpretation of this is domain specific but the lower range for starting to look suspiciously at relationships starts in the 0.3+ area. 

Wednesday, April 2, 2025

Storage on Kubernetes, Part 2

Having opted for installing MicroK8s and its storage and network addons, I then tried to add another node. Adding another node is still proving problematic but storage was much easier than raw Kubernetes.

To make Helm recognise MicroK8s rather than kubectl, run this:

alias kubectl='microk8s.kubectl'
export KUBECONFIG=/var/snap/microk8s/current/credentials/client.config

And install Strimzi as I outline in a previous post.

When I tried to see what the logs were of the strimzi-cluster-operator, it errored with "tls: failed to verify certificate: x509: certificate is valid for" ... I've still not worked this out yet but this Github ticket helped me. Apparently, I need to call kubectl --insecure-skip-tls-verify-backend=true logs...  for reasons I don't entirely understand yet. [Addendum: looks like I needed to run sudo microk8s refresh-certs --cert ca.crt as my certificates used to connect to the containers were apparently out of date].

Anyway, when I deployed my slightly modifed version of kafka-with-dual-role-nodes.yaml, changing the storage type from jbod [Just a Bunch of Disks] to persistent-claim, the Strimzi operator was puking "The size is mandatory for a persistent-claim storage". Looking at the code, it seems Strimzi is expecting a child config node of size

Now, I just need some YAML to create a MicroK8s PVC (which I stole from SO) and now 3 node Kafka cluster appears to be up and running, writing their data to the host machine's /var/snap/microk8s/common/default-storage path.

Gotcha!

I took my laptop to a coffee shop and worked there and everything was broken again! Kubelite was puking with:

$ journalctl -f -u snap.microk8s.daemon-kubelite -f --no-hostname --no-pager

...
Apr 02 11:15:15 microk8s.daemon-kubelite[309439]: W0402 11:15:15.083568  309439 logging.go:55] [core] [Channel #5 SubChannel #6]grpc: addrConn.createTransport failed to connect to {Addr: "unix:///var/snap/microk8s/7964/var/kubernetes/backend/kine.sock:12379", ServerName: "kine.sock:12379", }. Err: connection error: desc = "transport: Error while dialing: dial unix /var/snap/microk8s/7964/var/kubernetes/backend/kine.sock:12379: connect: connection refused"

kine acts as an in-process or standalone service that translates Kubernetes API interactions into operations on the dqlite database. It essentially makes dqlite behave like etcd from the Kubernetes API server's perspective.

Debugging gives:

$ microk8s inspect
Inspecting system
Inspecting Certificates
Inspecting services
  Service snap.microk8s.daemon-cluster-agent is running
  Service snap.microk8s.daemon-containerd is running
  Service snap.microk8s.daemon-kubelite is running
  Service snap.microk8s.daemon-k8s-dqlite is running
  Service snap.microk8s.daemon-apiserver-kicker is running
...

but this is a lie! Kubelite (and daemon-k8s-dqlite) say they're running but this is not the whole story. They're constantly restarting because there is a problem.

$ journalctl -f -u snap.microk8s.daemon-k8s-dqlite.service -f --no-hostname --no-pager
...
Apr 02 10:57:50 microk8s.daemon-k8s-dqlite[274391]: time="2025-04-02T10:57:50+01:00" level=fatal msg="Failed to create server" error="failed to create dqlite app: listen to 192.168.1.253:19001: listen tcp 192.168.1.253:19001: bind: cannot assign requested address"
...

That's not my current IP address! That's my address back in the office! I've rebooted my laptop since being connected to that network so I don't know where it's getting it from.

I could change the files in /var/snap/microk8s/current/ but a rerturn to the office made everything work again.

Logging tips

See what a service is doing with, for example:

journalctl -u snap.microk8s.daemon-cluster-agent

and check a status with for example:

systemctl status snap.microk8s.daemon-kubelite


Friday, March 28, 2025

Iceberg Tidbits

Some miscellaneous Iceberg notes.

Cleaning up

There is a logic to this. Russell Spitzer, of the Apache team says:
run rewrite first
expire second
you don't need to run remove_orphans unless something broke [Discord]
Here is a quick BDD I wrote that illustrates what's going on. Basically: 

  1. rewrite_data_files puts all the data in as few a number of files as possible. 
  2. expire_snapshots then deletes any files that are surplus.
Fewer files means less IO and a more efficient query.

Russell explains the reason we don't use remove_orphans  
To clarify, Remove Orphan Files is only for cleaning up files from failed writes or compactions that failed in a way that the process could not clean up.
Inside of Remove orphan files are 2 expensive remote steps
  • Listing everything in the directories owned by the table
  • Join of that list with metadata file paths
Then the list of files to delete is brought back to the driver where deletes are preformed [Discord]
This BDD demonstrates removing orphans through the Java API. I wanted to use CALL system.remove_orphan_files but couldn't. Instead, I got the error:

java.lang.IllegalArgumentException: Cannot remove orphan files with an interval less than 24 hours. Executing this procedure with a short interval may corrupt the table if other operations are happening at the same time. If you are absolutely confident that no concurrent operations will be affected by removing orphan files with such a short interval, you can use the Action API to remove orphan files with an arbitrary interval.
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.validateInterval(RemoveOrphanFilesProcedure.java:209)

which is a bit much for a simple BDD.

I forced Iceberg to break by having two threads try to write to the same table at the same time.

Pushdown

This is more Spark than just Iceberg but it's an interesting use case.

The IN clause is not pushed down, at least in Spark 3.5.1 - see some nice analysis here. TL;DR; instead of using IN, the most efficient query just converts it into a set of ANDs and ORs. Note that pushdown is (according to Russel Spitzer) pushing the actual function to the data and not mapping it to some other function that is semantically equivalent (as we see in that link).

This filter was not pushed down
Again, I've got a BDD to demonstrate this. Note that if the filters element is empty in a Physical Plan's BatchScan, your function has not been pushed down.