Saturday, August 30, 2025

(It's a) Kind and Strimzi

I have a Kafka cluster running on my laptop in Kind K8s using Strimzi. It just works [certain terms and conditions apply]!

For installing Strimzi etc, refer to a previous blog post of mine.

Then, I deployed kafka-with-dual-role-nodes.yaml with a few minor changes. 

First I changed the cluster name to kafka. Then, I wanted to use ephemeral disks as I didn't care about data loss in a PoC running on my PC:

-        type: persistent-claim
-        size: 100Gi
-        deleteClaim: false
+        type: ephemeral

But the main thing I had to do was create an external nodeport:

+      - name: external
+        port: 9094
+        type: nodeport   # 👈 important
+        tls: false

This meant I could see the service exposing the port to the host:

$ kubectl get svc -n kafka --output=wide
NAME                             TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE     SELECTOR
kafka-dual-role-0                NodePort    10.96.101.127   <none>        9094:31904/TCP                                 4m43s   statefulset.kubernetes.io/pod-name=kafka-dual-role-0,strimzi.io/cluster=kafka,strimzi.io/kind=Kafka,strimzi.io/name=kafka-kafka,strimzi.io/pool-name=dual-role
kafka-dual-role-1                NodePort    10.96.128.155   <none>        9094:30402/TCP                                 4m43s   statefulset.kubernetes.io/pod-name=kafka-dual-role-1,strimzi.io/cluster=kafka,strimzi.io/kind=Kafka,strimzi.io/name=kafka-kafka,strimzi.io/pool-name=dual-role
kafka-dual-role-2                NodePort    10.96.169.99    <none>        9094:31118/TCP                                 4m43s   statefulset.kubernetes.io/pod-name=kafka-dual-role-2,strimzi.io/cluster=kafka,strimzi.io/kind=Kafka,strimzi.io/name=kafka-kafka,strimzi.io/pool-name=dual-role

It's essentially port forwarding for me.

Note that one does not connect to the CLUSTER-IP. You need to see where these kafka-dual-role-* pods live:

$ kubectl get pods -n kafka --output=wide
NAME                                        READY   STATUS    RESTARTS      AGE   IP            NODE                 NOMINATED NODE   READINESS GATES
kafka-dual-role-0                           1/1     Running   0             22h   10.244.0.38   kind-control-plane   <none>           <none>
kafka-dual-role-1                           1/1     Running   0             22h   10.244.0.39   kind-control-plane   <none>           <none>
kafka-dual-role-2                           1/1     Running   0             22h   10.244.0.40   kind-control-plane   <none>           <none>

Ah, kind-control-plane. Which IP does that have?

$ kubectl get nodes --output=wide
NAME                 STATUS   ROLES           AGE    VERSION   INTERNAL-IP   EXTERNAL-IP   OS-IMAGE                         KERNEL-VERSION     CONTAINER-RUNTIME
kind-control-plane   Ready    control-plane   139d   v1.32.3   172.18.0.2    <none>        Debian GNU/Linux 12 (bookworm)   6.8.0-60-generic   containerd://2.0.3

$ ./kafka-topics.sh --bootstrap-server=172.18.0.2:31118 --list

$

(Which is expected as we haven't created any topics yet.)
"NodePort is a Kubernetes Service type designed to make Pods reachable from a port available on the host machine, the worker node.  The first thing to understand is that NodePort Services allow us to access a Pod running on a Kubernetes node, on a port of the node itself. After you expose Pods using the NodePort type Service, you’ll be able to reach the Pods by getting the IP address of the node and the port of the NodePort Service, such as <node_ip_address>:<node port>.  The port can be declared in your YAML declaration or can be randomly assigned by Kubernetes.  Most of the time, the NodePort Service is used as an entry point to your Kubernetes cluster." [The Kubernetes Bible]
So, the port from the svc and the IP address from the nodes.

Aside: one nice thing about Kind is that I can take my laptop to a coffee shop and join a new network and things carry on running despite my IP address changing. I don't think that is currently possible on the reference K8s.

One bad thing about Strimzi is that it barfs with this error when I upgraded the reference K8s implementation to 1.33. The cause was:

Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "emulationMajor" (class io.fabric8.kubernetes.client.VersionInfo), not marked as ignorable (9 known properties: "goVersion", "gitTreeState", "platform", "minor", "gitVersion", "gitCommit", "buildDate", "compiler", "major"])

 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 4, column: 22] (through reference chain: io.fabric8.kubernetes.client.VersionInfo["emulationMajor"])

at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61) ~[com.fasterxml.jackson.core.jackson-databind-2.16.2.jar:2.16.2]

...

at io.fabric8.kubernetes.client.utils.KubernetesSerialization.unmarshal(KubernetesSerialization.java:257) ~[io.fabric8.kubernetes-client-api-6.13.4.jar:?]


This ultimately stops the Strimzi operator. Looks like the Fabric8 library needs updating. The Strimzi version I'm using is:

$ helm pull strimzi/strimzi-kafka-operator --untar
$ helm template test-release ./strimzi-kafka-operator/ | grep "image:"
          image: quay.io/strimzi/operator:0.45.0


Monday, August 25, 2025

Cloud Architecture

This is still something I am mulling over but here are my notes.

Problem statement

We want different organisations to share large quantities of confidential data to be processed.

The prerequisites are:
  1. must be secure
  2. must be entirely FOSS based
  3. must be cross-cloud
  4. allows a bring-your-own policy
The choice of Apache Iceberg for the data seems to be straightforward. But the question of infra provisioning is not an easy one with a whole debate going on in Discord. Some love Terraform for being (somewhat) typesafe, others think controlllers are the way to go.

Infra provisioning

As ever, the answer to what route to take is "it depends" but here are some of the esoteric terms defined.

Crossplane is a CNCF-compliant, Golang "backend that enables you to build a control plane that can orchestrate applications and infrastructure no matter where they run". So, you could use Crossplane to provision infra not just in its K8s cluster but in the cloud. ACK (AWS Controllers for K8s) is an AWS specific equivalent of Crossplane that watches its CRDs and provisions accordingly.

In the other corner is the reigning champion, Terraform and it's FOSS fork, OpenTofu (both written in Go). Terraform has a form of type system but it's not enforced until the plan stage and is "loose" as it's not strict but allows type coercion. 

You can use CDKTF (which has common language bindings to create Terraform config files) but there is some doubt about its future.

Another tool to address the issues with raw Terraform (lack of DRY principles, ciruclar dependencies, orchestration etc) is Terragrunt, a thin wrapper around Terraform/OpenTofu written in Go. It allows the output from one stage to be the input to another [Discord]. Hashicorp, the creators of Terraform, have recognised these problems and have released Stacks.

A central way to orchestrate and deploy your Terraform config is the (mostly) Java Terrakube. It also adds an RBAC layer. Because everything runs remotely, it can be scheduled to detect drift, say, in the middle of the night.

Similarly, Atlantis is a Go server that has RBAC, executes the commands, requires approval for pull requests before applying them and generally implements a GitOps workflow.

For self-serve, there is the FOSS, Typescript Kubero. This allows developers to manage their own K8s deployments. Coolify, meanwhile, is a PHP tool that makes managing your own servers very cloud-like. It just needs an SSH connection.

Tuesday, August 12, 2025

BERT

I've been applying HuggingFace's ModernBert to a sequence of medical codes in the hope we can identify people who may have type 2 diabetes before even their doctors. The results have been pretty good and met expectations (the same as an American study; this is UK data). 

But what I'm surprised about is that a very simple approach performed as well as the more complicated one recommended by our American-based colleagues. Their recommendation was to use transfer learning on their model. This simply didn't work as their model used a medical encoding system called ICD10 when the British more commonly use the richer SNOMED. The theory was with fine tuning, the model would learn. It didn't.

As an aside, mapping one encoding to another is a non-trivial task. Avoid it if you can.

Instead, we pretrained BERT ourselves exclusively on our data using a Masked Language Model. This is where random words are omitted from a sequence and the model is asked to guess them. Having built the model, we then finetune it with the medical code sequence for those with and without T1D seeing if BERT can classify them.

Note that in this case, there is no pretrained BERT. We build it entirely from just our data. A sequence of medical codes is a very simple 'language' so you don't need massive compute resources to train a model. A single Tesla V100 with 16gb of VRAM was sufficient.

Although this achieved good results, it took literally days. The surprising things was that when we (perhaps naively) treated the sequence of codes as a sentence and used a plain BertForSequenceClassification object to train the model in just one step (no pretraining and finetuning) we got the same results.

Anyway, here are some notes I made along the way:

The neural net

Each of the heads defined by num_attention_heads focusses on different aspects of the text. More can better represent the data. Too many can cause overfitting. Its default is 12.

num_hidden_layers impacts accuracy and the model's ability to generalize. Its default is 12. Recommended values depend on use case. Less than 6 is good for resource-constrained environments; 6-12 for fine-tuning; 12-24 for large pretraining.

ChatGPT suggests for BERT classification, final loss should be 0.2-0.4. "BERT can overfit fast, especially with small datasets."

per_device_train_batch_size is purely a runtime optimization.

call model.eval() and put all access to the model in a with torch.no_grad() block.

Note that BERT does not evaluate model improvement by being told the labels. It just uses the labels to calculate the validation loss.

Pretraining

It's important to note that when pretraining BERT for classification, it's not told what the answers are. Instead, it's fed token sequences where some are masked and it tries to guess what the missing tokens can be. This is the accuracy metric it spits out every so often.

Finetuning

When BERT is training, you'll see Training and Validation loss. Of the two, validation is the most important. Training loss is telling you how well the model deals with data it has already seen. The model is not trained on the evaluation set so Validation indicates how well it will fare with data it has not been trained on.  Consequently, a falling training loss and a constant validation loss indicates that the model is overfitting. This can be addressed with dropout and/or weight_decay.

The conventional way of training BERT is to use BertForMaskedLM then use BertForSequenceClassification for fine tuning. The first takes sentences from the training data, randomly blanks out some words and guesses what they might be. However, we found that for small data sets that are not natural language (sequences of medical codes), we could get good performance by just using BertForSequenceClassification which does all the back propagation in a fraction of the time.

When calling BertForSequenceClassification.from_pretrained you might see:

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at ... and are newly initialized: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.

Apparently, this is OK. If you've been given a model that you wish to then use for classification. The classifier.* is the last layer of the neural net that takes a model that's been trained (possibly by masking but not classification) to which we will add a classification layer.

The special CLS token represents a summary of the string that's being encoded. The bert.pooler.* is a single, final, dense layer applied to this token that's used in classification (is not used at all in masking).

The Tokenizer(s)

Typically, you'll see tokenizers from both the Tokenizers and Tranformers libraries. They're both from HuggingFace so it's not surprising that you can tokenize with one and save it and then load the data with a tokenizer from the other.

Note that the tokenizers do not save the words as vectors. This embedding is done in the neural net. Instead, they offer different approaches  - for instance, balancing a vocabulary that can cover words it's not seen yet with having a vocab that's too big and makes the training inefficient. 

In the Transfomers library, you'll see some tokenizer names appended with Fast. These really are much faster to run as they're Rust implementations.

Almost as an aside, we tried to build our own tokenizer vocabulary by hand as we wanted to stuck as closely to the methodology used by our American colleagues as we could. This is perfectly possible (just point tokenizers.models.WordPiece.from_file at your vocab.txt file) but we found when we prepended all the codes with their classification system in both vocab.txt and the actual data we were tokenizing, accuracy was an unrealistically high 0.999 from the get-go. Why this was the case remains a mystery. Needless to say, we backed out that change. [Addendum: it's still a mystery why but using just the 30k most common whole words as tokens rather than >100k previously resulted in good accuracy].

PyTorch

Backing the Transformer's implementation of BERT if PyTorch. Browsing through the code you'll see references to unsqueeze. This is an interesting fella that can best be described by this StackOverflow answer. Basically, it can take a 2-d matrix and turn it into a 3-d tensor with the argument dictating whether the elements are in the x-, y- or z-plane. From a programming point of view, you'll see that the matrice's elements become nested one layer deeper in more square brackets, the exact configuration of those brackets depend upon which plane the elements have been unsqueezed into.

This diagram from the SO answer is great:


.

Sunday, August 3, 2025

Iceberg Distributions

There are natural but opposing forces at work in writing data with Iceberg/Spark. Writing many files is more efficient at processing time but leads to greater admin costs - in the extreme, they can cause OOMEs when handling the table's metadata.

The user can fiddle with a few knobs to mitigate this. One is write.distribution-mode. Here are some tests I created to see how the configuration changes affect the number of files when writing the same data:
 
write.distribution-mode Number of files Notes
"hash"  p df.writeTo(tableName).append()
"hash", sorted DataFrame  p ...TBLPROPERTIES ('sort-order' = 'partitionField ASC NULLS FIRST'...
"hash", sorted table  p df.sort("partitionField").writeTo(tableName).append()
"hash", sorted table but only one value for partitionField  1  because p=1; assumes the size of the data to write is < write.spark.advisory-partition-size-bytes. Otherwise multiple files are written (Spark 3.5).
"none"  d * p df.writeTo(tableName).append()
"none", sorted DataFrame  p df.sort("partitionField").writeTo(tableName).append()
"none", sorted table  d * p ...TBLPROPERTIES ('sort-order' = 'partitionField ASC NULLS FIRST'...
"none", sorted table but only one value for partitionField  d  because p=1

p = number of (logical) partitions in the data
d = number of (physical) partitions in the data frame

Note that this is for Spark 3.5. For a distribution mode of hash, and with the size of data exceeding advisory-partition-size-bytes, multiple threads write multiple files.

But for Spark 3.3, if we use a distribution mode of hash and the data exceeds the size of write.spark.advisory-partition-size-bytes, then only one thread writes.

Fan out made no difference in my tests that measured the number of files but it should be used, despite what the documentation says. Contrary to the documentation, Russell Spitzer on Discord says:
"Fanout writer is better in all cases.  We were silly.  The memory requirements were tiny IMHO.  Without fanout, you need to presort within the task but that ends up being way more expensive (and memory intesive) IMHO.  In the latest versions @Anton Okolnychyi removed the local sort requirements if fanout is enabled, so I would recommend fanout always be enabled and especially if you are using distribution mode is none."