Tuesday, August 12, 2025

BERT

I've been applying HuggingFace's ModernBert to a sequence of medical codes in the hope we can identify people who may have type 2 diabetes before even their doctors. The results have been pretty good and met expectations (the same as an American study; this is UK data). 

But what I'm surprised about is that a very simple approach performed as well as the more complicated one recommended by our American-based colleagues. Their recommendation was to use transfer learning on their model. This simply didn't work as their model used a medical encoding system called ICD10 when the British more commonly use the richer SNOMED. The theory was with fine tuning, the model would learn. It didn't.

As an aside, mapping one encoding to another is a non-trivial task. Avoid it if you can.

Instead, we pretrained BERT ourselves exclusively on our data using a Masked Language Model. This is where random words are omitted from a sequence and the model is asked to guess them. Having built the model, we then finetune it with the medical code sequence for those with and without T1D seeing if BERT can classify them.

Note that in this case, there is no pretrained BERT. We build it entirely from just our data. A sequence of medical codes is a very simple 'language' so you don't need massive compute resources to train a model. A single Tesla V100 with 16gb of VRAM was sufficient.

Although this achieved good results, it took literally days. The surprising things was that when we (perhaps naively) treated the sequence of codes as a sentence and used a plain BertForSequenceClassification object to train the model in just one step (no pretraining and finetuning) we got the same results.

Anyway, here are some notes I made along the way:

The neural net

Each of the heads defined by num_attention_heads focusses on different aspects of the text. More can better represent the data. Too many can cause overfitting. Its default is 12.

num_hidden_layers impacts accuracy and the model's ability to generalize. Its default is 12. Recommended values depend on use case. Less than 6 is good for resource-constrained environments; 6-12 for fine-tuning; 12-24 for large pretraining.

ChatGPT suggests for BERT classification, final loss should be 0.2-0.4. "BERT can overfit fast, especially with small datasets."

per_device_train_batch_size is purely a runtime optimization.

call model.eval() and put all access to the model in a with torch.no_grad() block.

Note that BERT does not evaluate model improvement by being told the labels. It just uses the labels to calculate the validation loss.

Pretraining

It's important to note that when pretraining BERT for classification, it's not told what the answers are. Instead, it's fed token sequences where some are masked and it tries to guess what the missing tokens can be. This is the accuracy metric it spits out every so often.

Finetuning

When BERT is training, you'll see Training and Validation loss. Of the two, validation is the most important. Training loss is telling you how well the model deals with data it has already seen. The model is not trained on the evaluation set so Validation indicates how well it will fare with data it has not been trained on.  Consequently, a falling training loss and a constant validation loss indicates that the model is overfitting. This can be addressed with dropout and/or weight_decay.

The conventional way of training BERT is to use BertForMaskedLM then use BertForSequenceClassification for fine tuning. The first takes sentences from the training data, randomly blanks out some words and guesses what they might be. However, we found that for small data sets that are not natural language (sequences of medical codes), we could get good performance by just using BertForSequenceClassification which does all the back propagation in a fraction of the time.

When calling BertForSequenceClassification.from_pretrained you might see:

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at ... and are newly initialized: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.

Apparently, this is OK. If you've been given a model that you wish to then use for classification. The classifier.* is the last layer of the neural net that takes a model that's been trained (possibly by masking but not classification) to which we will add a classification layer.

The special CLS token represents a summary of the string that's being encoded. The bert.pooler.* is a single, final, dense layer applied to this token that's used in classification (is not used at all in masking).

The Tokenizer(s)

Typically, you'll see tokenizers from both the Tokenizers and Tranformers libraries. They're both from HuggingFace so it's not surprising that you can tokenize with one and save it and then load the data with a tokenizer from the other.

Note that the tokenizers do not save the words as vectors. This embedding is done in the neural net. Instead, they offer different approaches  - for instance, balancing a vocabulary that can cover words it's not seen yet with having a vocab that's too big and makes the training inefficient. 

In the Transfomers library, you'll see some tokenizer names appended with Fast. These really are much faster to run as they're Rust implementations.

Almost as an aside, we tried to build our own tokenizer vocabulary by hand as we wanted to stuck as closely to the methodology used by our American colleagues as we could. This is perfectly possible (just point tokenizers.models.WordPiece.from_file at your vocab.txt file) but we found when we prepended all the codes with their classification system in both vocab.txt and the actual data we were tokenizing, accuracy was an unrealistically high 0.999 from the get-go. Why this was the case remains a mystery. Needless to say, we backed out that change. [Addendum: it's still a mystery why but using just the 30k most common whole words as tokens rather than >100k previously resulted in good accuracy].

PyTorch

Backing the Transformer's implementation of BERT if PyTorch. Browsing through the code you'll see references to unsqueeze. This is an interesting fella that can best be described by this StackOverflow answer. Basically, it can take a 2-d matrix and turn it into a 3-d tensor with the argument dictating whether the elements are in the x-, y- or z-plane. From a programming point of view, you'll see that the matrice's elements become nested one layer deeper in more square brackets, the exact configuration of those brackets depend upon which plane the elements have been unsqueezed into.

This diagram from the SO answer is great:


.

Sunday, August 3, 2025

Iceberg Distributions

There are natural but opposing forces at work in writing data with Iceberg/Spark. Writing many files is more efficient at processing time but leads to greater admin costs - in the extreme, they can cause OOMEs when handling the table's metadata.

The user can fiddle with a few knobs to mitigate this. One is write.distribution-mode. Here are some tests I created to see how the configuration changes affect the number of files when writing the same data:
 
write.distribution-mode Number of files Notes
"hash"  p df.writeTo(tableName).append()
"hash", sorted DataFrame  p ...TBLPROPERTIES ('sort-order' = 'partitionField ASC NULLS FIRST'...
"hash", sorted table  p df.sort("partitionField").writeTo(tableName).append()
"hash", sorted table but only one value for partitionField  1  because p=1; assumes the size of the data to write is < write.spark.advisory-partition-size-bytes. Otherwise multiple files are written (Spark 3.5).
"none"  d * p df.writeTo(tableName).append()
"none", sorted DataFrame  p df.sort("partitionField").writeTo(tableName).append()
"none", sorted table  d * p ...TBLPROPERTIES ('sort-order' = 'partitionField ASC NULLS FIRST'...
"none", sorted table but only one value for partitionField  d  because p=1

p = number of (logical) partitions in the data
d = number of (physical) partitions in the data frame

Note that this is for Spark 3.5. For a distribution mode of hash, and with the size of data exceeding advisory-partition-size-bytes, multiple threads write multiple files.

But for Spark 3.3, if we use a distribution mode of hash and the data exceeds the size of write.spark.advisory-partition-size-bytes, then only one thread writes.

Fan out made no difference in my tests that measured the number of files but it should be used, despite what the documentation says. Contrary to the documentation, Russell Spitzer on Discord says:
"Fanout writer is better in all cases.  We were silly.  The memory requirements were tiny IMHO.  Without fanout, you need to presort within the task but that ends up being way more expensive (and memory intesive) IMHO.  In the latest versions @Anton Okolnychyi removed the local sort requirements if fanout is enabled, so I would recommend fanout always be enabled and especially if you are using distribution mode is none." 

Sunday, June 15, 2025

Lessons from a migration

Don't use low/no-code. It really is a false economy. It appeals to managers during sales pitches because it looks so simple. But your team will spend all their time and all your money debugging in a most un-ergonimc manner. And as for testing....

Azure Data Factory is a no-code solution that's great for simple DAGs. However, it is not suitable for anything more entrerprise-y. For instance, if you're copying large amounts of data from one database to another and there is a failure, there is no scope for cleaning up the data. The first you'll know of it (because nobody ever reads the logs) is when the users are complaining that the numbers coming out of their SQL queries are wrong.  

ADF doesn't even allow nested ForEach loops! It stores pipelines as JSON so watch it get confused when the pipeline itself contains JSON!

Don't give people raw SQL access. They can make profound changes and you'll have no logs and little ability to correct it.

Everything needs to have an automated audit log. There's going to be large number of questions like: "why did it do this?" It's never simply a matter of success or fail. There is huge nuance - eg, type conversion may mean what was in the source system is not exactly the same as in the destination system. Is this a pass or a fail?

Processes need orchestrating. One process reads a dataset while another deletes it/writes to it causing nondeterministic errors. You get similar issues when you issue a cancel.

Communication - docs are always changing. Be aware that most knowledge is tribal, not recorded.

Scheduling: everything was based on time. So, it's possible two daily jobs were running at the same time if the first took more than 24 hours. Data migrating from one DB to another within a cloud zone and subscription was at a rate of ~10mb/s. This meant some tables took hours. And it didn't scale. As the project progressed, more tables were to be migrated. But this caused some jobs to take days. So, the weekly transfer was still going on when people came to the office on Monday morning. Consequently, their queries were returning inconsistent results - the silent killer.

The metadata must be constrained. If it lives in a database, it must have referential integrity. This can be overlooked because it's not business data. But if you want to eliminate mysterious errors, it's essential to get this right.

Like water finding its natural level, people will naturally gravitate to the path of least resistance. As a general rule, teams do not follow best practises, naming conventions or industry standards. This is not strictly true but it's so common that you must assume for all teams that their data and code is a Rube Goldberg Machine.

Regular meetings with major stakeholders. These meeting can be brief (about 15 minutes is OK once the cadence is established) but they do need to be frequent (at least twice a week, preferably daily).

A Quick look at Quarkus


Microservice Framework

Quarkus depends a lot on SmallRye "a set of implementations of the MicroProfile specifications. We said that Quarkus is also a MicroProfile implementation, so this begs for a bit of explanation. Each SmallRye project implements one of the MicroProfile specifications" [1].

The MicroProfile specification is a set of standards for building microservice containers - for instance, handling Kubernetes health checks etc.

Native code

GraalVM has two main features: 
  • it allows polyglot development
  • it allows Ahead-of-Time (AOT) compilation.
This latter features is what Quarkus is interested in, although "Mandrel is a downstream (forked) distribution of the Oracle GraalVM CE with the main goal of providing a way to build a native executable specifically designed to support Quarkus... Mandrel focuses mainly on the native-image build tool. It doesn’t provide a full GraalVM toolset." [1]

Generate the binary with:

mvn package -Pnative

This defers to GRAALVM_HOME/bin/native-image and gcc

If we delegate to Graal, why use Quarkus at all? Well, Graal struggles with reflection. Quarkus provides shims that mean popular frameworks don't use reflection and can therefore be turned to native code.

Note that just because it produces an executable binary, the artifact still needs a garbage collector. In the output, you'll see:

Garbage collector: Serial GC (max heap size: 80% of RAM)

You don't need to use Quarkus to build native executables, though. For instance, if you tried to convert the Java port of Llama.cpp, you can build it with just make native - although note that I had to use version 24 of GraalVM as earlier versions didn't like the use of AVX (vector extensions).

Thread management

Much like Scala's Cats and ZIO, Quarkus facilitates execution engines. For instance it has a notion of blocking threads. "The @Blocking annotation on both methods tells Quarkus that the method executes a blocking operation (persist or delete) on a database, so it needs to execute on worker thread, which allows blocking." [1]
PolarisPrincipalAuthenticatorFilter

[1] Quarkus in Action (Manning)

Making ML projects more robust

The software industry has made great strides to make the engineering process more robust. Despite being an adjacent disciple, data science is still in the dark ages.

You can unit test any code so there is no excuse for not writing them. However, a lot of data science tools are not ergonomic in this regard. Notebooks are great for EDA, less so for robust software practises.

So, although they are not a replacement for automated tests, here are some tips if you're stuck in the world of notebooks.

Observability

If your platform does not support it out of the box, add it manually. For instance, in one project, there was one table that another 5 joined to at some point downstream in the pipeline. That first table was run several times with different parameters. This lead to the potential problem that the 6 tables in total may be out of synch with each other. A solution was to add a runtime date in the first table and all the other 5 preserve this value when they are generated. The Python code that then uses this data asserts that the date is consistent across all tables. 

Plot graphs of important features

In a bioinformatics project, the age of a patient was calculated in Python as the time between a patient's date of birth and the start of the observation period. This was reasonable. But several weeks later, the client asked for all of a patients medical details to be used in the study. Consequently, the observation period - calculated in SQL - effectively became a patient's first contact with the medical services. 

For most people, this happens in the first year of life. For them, their age became zero. This wasn't universally true. Imigrants would have their first medical intervention later in life. So, somebody browsing the data might think it odd there was so many infants in their sample but assume that it was just a statistical fluctuation as there were clearly 30 and 40 year olds in there too. 

However, when you plotted the age after the SQL code change, the graph of the population's age looked like this:

A bug in calculating age

Without an end-to-end suite of tests, nobody thought of updating the age calculation logic.

Sampling 

The moment you add constraints to sampled data, you must beware of some statistical oddities. 

We wanted to assign a cutoff date to the negative cohort randomly sampled from the date from the positive. Sounds straightforward, right? Well, then you have to add a constraint that the cutoff date only made sense if it came after a patient's date of birth. But this very reasonable constraint skews the data. The reason being that somebody born in 2000 can be given any cutoff date 2000 and 2025 giving them an age at cutoff of 0 to 25 years old. After sampling, we might give them a cutoff date of 2003 and consequently an age of 3. 

A bug in sampling

However, somebody born in 2022 can have a maximum age of 3 after sampling. Consequently, we have a situation where the 25-year old born in 2000 can contribute to the 3-year old bucket after sampling, but the 3-year old born in 2022 cannot contribute to the 25-year old bucket. Hence, you get far more 3-year olds than are really in the data.

Saturday, May 31, 2025

Iceberg and Kafka Connect

I've been looking at Iceberg ticket 11818 "Iceberg Kafka Connector experiences a constant hanging lag for low-volume topics".

Quick background: Kafka Connect is part of the Kafka project is a framework for "getting data from an application into Kafka or getting data out of Kafka into an external application." [Kafka Streams in Action, Manning]

Kafka Connect

I'm trying to get my head around Iceberg code that uses Kafka Connect.

SinkTask is the class that must be implemented and put is the method that receives a Collection of SinkRecords.

This SO answer describing the relationship between connectors and tasks was informative:
"A task is a thread that performs the actual sourcing or sinking of data.  
The number of tasks per connector is determined by the implementation of the connector...  
For sink connectors, the number of tasks should be equal to the number of partitions of the topic.  The task distribution among workers is determined by task rebalance which is a very similar process to Kafka consumer group rebalance."

Iceberg and Kafka Connect

Iceberg's Kafka Connect implementation can be configured to create the table if it does not exist. In the Integration*Test classes, you can see the Kafka Connect is configured such that iceberg.tables.route-field (TABLES_ROUTE_FIELD_PROP) is payload, which is a field in our test's TestEvent objects, messages that are serialized to JSON and sent to Kafka.

The first message will create a RecordWriter. If the IcebergWriterFactory is so configured, it will first create the table to which the RecordWriter will write.

Inspecting Kafka

When running kafka-consumer-groups.sh you can see CURRENT-OFFSET (the offset of the next record the consumer is to read) and LOG-END-OFFSET (the offset of the next record a producer is to write). The LAG is the difference between the two.

Seems that if receivedPartitionCount >= expectedPartitionCount is not true (see CommitState.isCommitReady), Coordinator.receive will not commit and set CommitState.currentCommitId to null. This means a StartCommit is never sent. 

Incidentally, I asked on the Iceberg Slack channel what the rationale was behind this but received no replies after 4 days.

This results in the Kafka Connect logging:

[2025-05-27 09:24:22,527] INFO Commit d9f18c4a-5253-4c55-8cce-b3a077bbf3c9 initiated (org.apache.iceberg.connect.channel.Coordinator)

so the commit is initiated but:

[2025-05-27 09:24:23,950] INFO Commit d9f18c4a-5253-4c55-8cce-b3a077bbf3c9 not ready, received responses for 1 of 2 partitions, waiting for more (org.apache.iceberg.connect.channel.CommitState)

Evidently, receivedPartitionCount is 1 and expectedPartitionCount is 2 (see below).

The problem

The line here in CommitState.isCommitReady:

if (receivedPartitionCount >= expectedPartitionCount) {

will lead to isCommitReady returning false if this inequality does not hold. Before we look at the values, let's first see how the leader is chosen.

Follow my leader

The algorithm to elect the leader is this:
"there should only be one task assigned partition 0 of the first topic, so elect that one the leader" [Commiter in Iceberg code]. 
It does this by taking "the list of partitions that are now assigned to the [Sink] task " [Kafka SinkTask] when it is opened. It then compares this to the members of the consumer group corresponding its ID we have. We call Kafka [docs] directly to get this.

expectedPartitionCount

The "list of the members of the consumer group" [Kafka ConsumerGroupDescription.members] becomes CommitterImpl.membersWhenWorkerIsCoordinator iff the lowest partition ID in for the members happens to be one of the list of partitions we've been given to coordinate via SinkTask.open (see the leadership election logic, above).

That is, if we're the leader, all those partitions are ours.

The Iceberg Coordinator is instantiated with this list and its totalPartitionCount is calculated as the sum of all the partitions for its members. 

Great, this is exactly the same as expectedPartitionCount in the code snippet above.

receivedPartitionCount

We take the batch of DataComplete objects the CommitState has been buffering. A DataComplete is a "control event payload for events sent by a worker that indicates it has finished sending all data for a commit request."

We sum the assignments for each Payload.

These assignments are from "the current set of assigned TopicPartitions for this task" [Kafka SinkTaskContext docs], that is, the worker that created the DataComplete objects. 

Note that it will "include all assigned topic partitions even if no messages were read" [Iceberg Worker code].

This sum of all these partitions for the messages with our currentCommitId is what we're after.

The problem here is we have a topic with 2 partitions and 2 tasks so each task has a single partition. Therefore, receivedPartitionCount = 1.

The Solution

If the tasks.max for this IcebergSinkConnector is 2, then the partitions will be shared and this inequality does not hold for a single message. But if it's set to 1, the single SinkConnector will deal with all partitions.

This leads to this log line:

[2025-05-27 11:41:21,712] INFO Commit 8dfe1850-d8b6-4054-9d0c-52a782a1d9e4 ready, received responses for all 2 partitions (org.apache.iceberg.connect.channel.CommitState)

Evidently, receivedPartitionCount = 2 and expectedPartitionCount <= 2. 

Saturday, May 17, 2025

DevOps lessons from Polaris and Iceberg

The Apache Iceberg and Polaris code bases make pretty good reading. There's some nice DevOps work here. Here are a few tidbits.

Iceberg's Kafka Connectors

As part of Iceberg's integration tests, Docker compose is used to fire up a Kafka Connect container. Interestingly, this container mounts the directory holding Iceberg's artifacts so it instantly has the latest implementation of Iceberg's Kafka Connect's SinkConnector and SinkTask. The test suite then starts the connector with a REST call (that's the nature of Kafka Connect) that contains all of the connector's config.

This docker-compose.yml also starts a MinIO container so Kafka Connects thinks it's writing to an AWS S3 bucket - all of this in on one self-contained laptop, no config required. (It uses TestContainers to do the heavy lifting of talking to the Docker daemon).

TestContainers has a very cool trick for cleaning up the containers when the tests are finished. It starts a sidecar called Ryuk that listens on a port connected to the JVM and kills the containers when that connection closes. You can see it while the tests are running with:

$ docker ps
CONTAINER ID   IMAGE                                 COMMAND                  CREATED         STATUS                            PORTS                                                           NAMES
...
f7c69b891330   testcontainers/ryuk:0.11.0            "/bin/ryuk"              5 seconds ago   Up 5 seconds                      0.0.0.0:32772->8080/tcp, :::32772->8080/tcp                     testcontainers-ryuk-3c88ed17-ec3d-4ce9-8830-dbbc1ca86294

Kind - K8s lite

You can run Polaris in a Kind cluster. Kind is Kubernetes in Docker. Although it is not CNCF compliant itself, it runs a CNCF compliant version of Kubernetes, just inside Docker containers. So, although it is not appropriate for a multi-node environment, Kind is great for development on a single machine.

Kind starts two Docker containers (kind-control-plane and kind-registry) as well as updating your ~/.kube/config file. If you ./run.sh in the Polaris codebase, you will see it start Kubernetes pods like etcdcore-dns and kube-proxy as well as a polaris container.

Metadata file

Iceberg creates a file called iceberg-build.properties when it's built. Having your project do this can be enormously useful when you've ssh-ed into a box and wondering exactly what version is running there because it's a test environment and nobody has keeping tracking of what is deployed where (ie, the real world). 

Iceberg is built with Gradle so uses the com.gorylenko.gradle-git-properties plugin but there appears to be an equivalent for Maven (git-commit-id-plugin).

Quarkus

Polaris has become heavily dependent on Quarkus. The Docker container just runs quarkus-run.jar. This Jar's main class is io.quarkus.bootstrap.runner.QuarkusEntryPoint. This loads /deployments/quarkus/quarkus-application.dat, a binary file that loads all the Polaris guff. Apparently, it's a binary file to minimise start up times.

The Docker image's entrypoint is /opt/jboss/container/java/run/run-java.sh. This script comes FROM the Dockerfile's Quarkus-friendly base image and contains sensible JVM defaults.

Wednesday, May 14, 2025

Interpreting XGBoost models

Playing with XGBoost at the moment. Here are some basic notes:

Scikitlearn's XGBoost Classifier

This takes a lot of parameters but the highlights are:

Also called eta, the learning_rate is similar to neural nets in that a smaller value means to slower but potentially better performance during training.

At what value of minimum loss before splitting a leaf is called gamma. A higher value means a more conservative model.

You can define the loss function with eval_metric. A value of logloss for instance will penalize confidently wrong predictions.

The fraction of the training data used per tree is the subsample argument and the scale_pos_weight weights the positive classes (useful for imbalanced data).

Random Stratified Data

Talking of imbalanced data, if there are two classes, A and B, and the number of data points for A is small, a random sample may randomly have a disproportionate number of As. To address this, we separate the A and B data points and take the exact number from each such that the ratio overall fits the real world.

For instance, if the ratio of A to B is 1:100 and we want a sample of 1000 points, using Random Stratified Data will give us precisely 10 As. Whereas a random sample could (feasibly) give us none.

Notes on the Confusion Matrix

High precision and low recall indicates a model that really wants to be sure it doesn't have too many false positives.

F1 scores are good for checking imbalanced data or false positives and negatives are of the same value. It's domain specific but a value of about 0.6 is a minium for being acceptable.

Specificity is how well a binary classifier spots false negatives. It's between 0 and 1 and higher is better. Sensitivity is the equivalent for false positives.

The Bayes factor is the ratio of the probability of getting this data given a hypothesis is true versus the null hypothesis, that is P(D|H1)/P(D|H0).

When less (data) is more

In survival analysis, you might want to employ a technique called temporaral censoring analysis. This applies to time-to-event data, the subject of survival analysis. The idea is that you censor data to minimise a fundamental problem: a person who yet to be diagnosed with a disease (for example) is classed the same as somebody who will never have it.

A similar approach is used to tackle Simpson's Paradox where the model behaves differently in aggregate than when the data is segmented. Here, we segment the data and evaluate the model on those cohorts only. These techniques are called cohort-based validation or segmented model evaluation depending on how you slice and dice the data.

Calibration

If you were to plot the probabilities of your model (from 0 to 1) against the actual fraction it of class A vs class B at that probability, you'd have a calibration curve. There are a few variants on the methodology (eg, Platt Scaling that treats this as a logisitic regression) but the simpler one is an isotonic method. This sorts your probabilities in ascending order while calculating the cumumlative total of class As monotonically isotonically increasing step function.

A line that hugs the diagonal indicates a well calibrated model.

Wednesday, April 16, 2025

Kafka in Kubernetes

If you ask Strimzi to set up a Kafka cluster out of the box, you'll see this when connecting:

$ kafka-topics.sh --bootstrap-server 10.152.183.163:9092 --list
...
[2025-04-04 16:35:59,464] WARN [AdminClient clientId=adminclient-1] Error connecting to node my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc
...

Where that IP address comes from:

$ kubectl get service -A
NAMESPACE     NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE...
kafka         my-cluster-kafka-bootstrap            ClusterIP   10.152.183.163   <none>        9091/TCP,9092/TCP,9093/TCP                     21d

It appears that Strimzi does not expose external Kafka ports by default. So, add an external port with:

kubectl get kafka my-cluster -n kafka -o yaml > /tmp/kafka.yaml

then edit /tmp/kafka.yaml adding:

    - name: external
      port: 32092
      tls: false
      type: nodeport

in the spec/kafka/listeners block and apply it with:

kubectl apply -n kafka -f /tmp/kafka.yaml

Now I can see:

$ kubectl get svc -n kafka
NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE   SELECTOR
...
my-cluster-kafka-bootstrap            ClusterIP   10.152.183.163   <none>        9091/TCP,9092/TCP,9093/TCP                     21d
my-cluster-kafka-external-bootstrap   NodePort    10.152.183.27    <none>        32092:32039/TCP                                11d

It appears that Strimzi has created a new service for us - hurrah! 

However, making a call to Kafka still fails. And this is because of the very architecture of Kubernetes. I am indeed communicating with a Kafka broker within Kubernetes but then it's forwarding me to another domain name, my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc. The host knows nothing about this Kubernetes domain name. Incidentally, the same happens in Kafka for a pure Docker configuration.

Kubernetes pods resolve their domain names using the internal DNS.

$ kubectl exec -it my-cluster-dual-role-1 -n kafka -- cat /etc/resolv.conf
Defaulted container "kafka" out of: kafka, kafka-init (init)
search kafka.svc.cluster.local svc.cluster.local cluster.local home
nameserver 10.152.183.10
options ndots:5

This nameserver is kube-dns (I'm using Microk8s):

$ kubectl get svc -n kube-system
NAME       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                  AGE
kube-dns   ClusterIP   10.152.183.10   <none>        53/UDP,53/TCP,9153/TCP   21d

and we can query it from the host:

$ nslookup my-cluster-dual-role-external-0.kafka.svc.cluster.local 10.152.183.10
Server: 10.152.183.10
Address: 10.152.183.10#53

Name: my-cluster-dual-role-external-0.kafka.svc.cluster.local
Address: 10.152.183.17

Now, to get the host to use the Kubernetes DNS for K8s domain names, I had to:

$ sudo apt update
$ sudo apt install dnsmasq
$ sudo vi /etc/dnsmasq.d/k8s.conf

This was a new file and needed:

# Don't clash with systemd-resolved which listens on loopback address 127.0.0.53:
listen-address=127.0.0.1
bind-interfaces
# Rewrite .svc to .svc.cluster.local
address=/.svc/10.152.183.10
server=/svc.cluster.local/10.152.183.10

That listen-address line was because sudo ss -ulpn | grep :53 showed both dnsmasq and systemd-resolved were fighting over the same port.

I also had to add:

[Resolve]
DNS=127.0.0.1
FallbackDNS=8.8.8.8
Domains=~svc.cluster.local

to /etc/systemd/resolved.conf to tell it to defer to dnsMasq first for domains ending with svc.cluster.local. Finally, restarting 

$ sudo systemctl restart systemd-resolved
$ sudo ln -sf /run/systemd/resolve/resolv.conf /etc/resolv.conf
$ sudo systemctl restart dnsmasq

Now let's use that external port we configured at the top of the post:

$ kubectl get svc -n kafka
NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE
...
my-cluster-kafka-external-bootstrap   NodePort    10.152.183.27    <none>        32092:32039/TCP          
$ ./kafka-topics.sh --bootstrap-server 10.152.183.27:32092 --create --topic my-new-topic  --partitions 3  --replication-factor 2
Created topic my-new-topic.
$ ./kafka-topics.sh --bootstrap-server 10.152.183.27:32092 --list
my-new-topic

Banzai!

Tuesday, April 15, 2025

What is a (local) LLM?

The short answer: just a bunch of matrices and config files.

The long answer: if you download an LLM from, say, HuggingFace, you'll see a directory structure a little like this:

$ tree ~/.cache/huggingface/hub/models--mlabonne--TwinLlama-3.1-8B/
├── refs
│   └── main
└── snapshots
    └── 5b8c30c51e0641d36a2f9007c88425c93db0cb07
        ├── config.json
        ├── generation_config.json
        ├── model-00001-of-00004.safetensors
        ├── model-00002-of-00004.safetensors
        ├── model-00003-of-00004.safetensors
        ├── model-00004-of-00004.safetensors
        ├── model.safetensors.index.json
        ├── special_tokens_map.json
        ├── tokenizer_config.json
        └── tokenizer.json

[slightly modified for clarity].

You can see the actual matrices with something like:

from transformers import AutoModel, AutoTokenizer
import torch

model = AutoModel.from_pretrained("mlabonne/TwinLlama-3.1-8B")
embedding_layer = model.get_input_embeddings()  # This contains token ID → vector mappings
embedding_weights = embedding_layer.weight  # Shape: (vocab_size, embedding_dim)
print(embedding_weights.size())

Output:

torch.Size([128256, 4096])

But if you want to get even more raw, you can just read the files directly:

from safetensors.torch import load_file

state_dict = load_file("model-00001-of-00004.safetensors")
embeddings = state_dict['model.embed_tokens.weight']
print(embeddings.size())

Output:

torch.Size([128256, 4096])

That is the size of the vocabulary and we can demonstrate this with:

$ grep -rl 128256 ~/.cache/huggingface/hub/models--mlabonne--TwinLlama-3.1-8B/
...
config.json:  "vocab_size": 128256 

You can see what the actual tokens (note: not words) are in tokenizer.json.

File formats

Safe Tensors are a HuggingFace format for storing tensors. The "safe" properties come from the possible security implications of PyTorch's .pt and .bin formats that allow the pickling of code in the files.

GGUF (Giant GGML Unified Format) is a format used by Llama (and compatible) models. The format stores everything (both weights and metadata) in one file, is good for CPU processing and allows quantization.

You can convert from Safe Tensors to GGUF by running something like this from the llama.cpp repo:

python  convert_hf_to_gguf.py --outtype f16  --outfile /home/henryp/Temp/models--mlabonne--TwinLlama-3.1-8B-DPO.gguf /home/henryp/.cache/huggingface/hub/models--mlabonne--TwinLlama-3.1-8B-DPO/snapshots/4a76d14414118c00fbbaed96cf1dd313a10f1409/

That directory being where the model's config.json lives.

Tensor files store their weights in various formats. FP16 and BF16 are both 16-bit encodings with the difference being which bits they allocate to the mantissa and exponent. BF16 has a larger range (same as FP32) but lower precision. FP16 is the traditional format for GPUs with more modern GPUs now also supporting BF16. 

You can even just use integers like INT8 and Q4_0, the latter being a 4-bit integer with the _0 refering to which technique was used to quantize it (eg, scaling). These different strategies trade off accuracy, speed, complexity etc.

The model

There's a nice Java implementation of Llama.cpp here. It's very nicely written and even allows you to compile the Java code to a native binary (use the latest GraalVM). This class represents the weights that are loaded from disk. In the case of a LLama model, the GGUF file is read from disk, parsed and just becomes an object this class.

Monday, April 14, 2025

Interpreting GLMs

How do you check the quality of your Generalized Linear Models?

You cannot use ROC and AUC curves for linear regression models by their very nature (they don't give yes/no answers). So one way is to use the Concordance Index (a.k.a. C-Index) instead. It works by taking pairs of observations where one has experienced the outcome and the other didn't and asking how the model did. With the results, you want a value greater than 0.5 but how much greater is (as always) context-dependent. In the social sciences, 0.75 may be great but pants in the hard sciences.

VIF indicates how much the collinearity of factors impact each other. It's quite domain specific but a value of less than 10 is considered moderate for prediction and less than 5 for inference.

Because of the mathematics behind them, the R-squared used in Ordinary Least Squares does not apply to a general GLM. Instead, Pseudo R-squared is used that tends to have a lower value. It is not recommended to use this metric for goodness of fit in GLMs.

Poisson and Binomial commonly have high deviance. Lower deviance indicates a better fit. The ratio of this to the degrees of freedom (the number of observations minus the number of parameters) are better for assessing the goodness of fit. Values moderately above 1.0 indicate overdispersion where the variance in the data is greater than what the model assumes. This matters less for Negative Binomial. Underdispersion indicates the data has clusters.

The alpha on a negative binomial should be about 1.0. This is the default in StatsModels. More than that indicates overdispersion.

Another metric is the ratio of Pearson chi squared to residual degrees of freedom. This should typically be from 0.5-0.8 to 1.2-2.0 depending on your tolerance. Again, high values indicate overdispersion, low underdispersion.

Chi-squared can tell you there is a relationship between categorical variables but it doesn't tell you its strength. For that use Cramer's V. This gives you a value between 0.0 and 1.0. Interpretation of this is domain specific but the lower range for starting to look suspiciously at relationships starts in the 0.3+ area. 

Wednesday, April 2, 2025

Storage on Kubernetes, Part 2

Having opted for installing MicroK8s and its storage and network addons, I then tried to add another node. Adding another node is still proving problematic but storage was much easier than raw Kubernetes.

To make Helm recognise MicroK8s rather than kubectl, run this:

alias kubectl='microk8s.kubectl'
export KUBECONFIG=/var/snap/microk8s/current/credentials/client.config

And install Strimzi as I outline in a previous post.

When I tried to see what the logs were of the strimzi-cluster-operator, it errored with "tls: failed to verify certificate: x509: certificate is valid for" ... I've still not worked this out yet but this Github ticket helped me. Apparently, I need to call kubectl --insecure-skip-tls-verify-backend=true logs...  for reasons I don't entirely understand yet. [Addendum: looks like I needed to run sudo microk8s refresh-certs --cert ca.crt as my certificates used to connect to the containers were apparently out of date].

Anyway, when I deployed my slightly modifed version of kafka-with-dual-role-nodes.yaml, changing the storage type from jbod [Just a Bunch of Disks] to persistent-claim, the Strimzi operator was puking "The size is mandatory for a persistent-claim storage". Looking at the code, it seems Strimzi is expecting a child config node of size

Now, I just need some YAML to create a MicroK8s PVC (which I stole from SO) and now 3 node Kafka cluster appears to be up and running, writing their data to the host machine's /var/snap/microk8s/common/default-storage path.

Gotcha!

I took my laptop to a coffee shop and worked there and everything was broken again! Kubelite was puking with:

$ journalctl -f -u snap.microk8s.daemon-kubelite -f --no-hostname --no-pager

...
Apr 02 11:15:15 microk8s.daemon-kubelite[309439]: W0402 11:15:15.083568  309439 logging.go:55] [core] [Channel #5 SubChannel #6]grpc: addrConn.createTransport failed to connect to {Addr: "unix:///var/snap/microk8s/7964/var/kubernetes/backend/kine.sock:12379", ServerName: "kine.sock:12379", }. Err: connection error: desc = "transport: Error while dialing: dial unix /var/snap/microk8s/7964/var/kubernetes/backend/kine.sock:12379: connect: connection refused"

kine acts as an in-process or standalone service that translates Kubernetes API interactions into operations on the dqlite database. It essentially makes dqlite behave like etcd from the Kubernetes API server's perspective.

Debugging gives:

$ microk8s inspect
Inspecting system
Inspecting Certificates
Inspecting services
  Service snap.microk8s.daemon-cluster-agent is running
  Service snap.microk8s.daemon-containerd is running
  Service snap.microk8s.daemon-kubelite is running
  Service snap.microk8s.daemon-k8s-dqlite is running
  Service snap.microk8s.daemon-apiserver-kicker is running
...

but this is a lie! Kubelite (and daemon-k8s-dqlite) say they're running but this is not the whole story. They're constantly restarting because there is a problem.

$ journalctl -f -u snap.microk8s.daemon-k8s-dqlite.service -f --no-hostname --no-pager
...
Apr 02 10:57:50 microk8s.daemon-k8s-dqlite[274391]: time="2025-04-02T10:57:50+01:00" level=fatal msg="Failed to create server" error="failed to create dqlite app: listen to 192.168.1.253:19001: listen tcp 192.168.1.253:19001: bind: cannot assign requested address"
...

That's not my current IP address! That's my address back in the office! I've rebooted my laptop since being connected to that network so I don't know where it's getting it from.

I could change the files in /var/snap/microk8s/current/ but a rerturn to the office made everything work again.

Logging tips

See what a service is doing with, for example:

journalctl -u snap.microk8s.daemon-cluster-agent

and check a status with for example:

systemctl status snap.microk8s.daemon-kubelite


Friday, March 28, 2025

Iceberg Tidbits

Some miscellaneous Iceberg notes.

Cleaning up

There is a logic to this. Russell Spitzer, of the Apache team says:
run rewrite first
expire second
you don't need to run remove_orphans unless something broke [Discord]
Here is a quick BDD I wrote that illustrates what's going on. Basically: 

  1. rewrite_data_files puts all the data in as few a number of files as possible. 
  2. expire_snapshots then deletes any files that are surplus.
Fewer files means less IO and a more efficient query.

Russell explains the reason we don't use remove_orphans  
To clarify, Remove Orphan Files is only for cleaning up files from failed writes or compactions that failed in a way that the process could not clean up.
Inside of Remove orphan files are 2 expensive remote steps
  • Listing everything in the directories owned by the table
  • Join of that list with metadata file paths
Then the list of files to delete is brought back to the driver where deletes are preformed [Discord]
This BDD demonstrates removing orphans through the Java API. I wanted to use CALL system.remove_orphan_files but couldn't. Instead, I got the error:

java.lang.IllegalArgumentException: Cannot remove orphan files with an interval less than 24 hours. Executing this procedure with a short interval may corrupt the table if other operations are happening at the same time. If you are absolutely confident that no concurrent operations will be affected by removing orphan files with such a short interval, you can use the Action API to remove orphan files with an arbitrary interval.
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.validateInterval(RemoveOrphanFilesProcedure.java:209)

which is a bit much for a simple BDD.

I forced Iceberg to break by having two threads try to write to the same table at the same time.

Pushdown

This is more Spark than just Iceberg but it's an interesting use case.

The IN clause is not pushed down, at least in Spark 3.5.1 - see some nice analysis here. TL;DR; instead of using IN, the most efficient query just converts it into a set of ANDs and ORs. Note that pushdown is (according to Russel Spitzer) pushing the actual function to the data and not mapping it to some other function that is semantically equivalent (as we see in that link).

This filter was not pushed down
Again, I've got a BDD to demonstrate this. Note that if the filters element is empty in a Physical Plan's BatchScan, your function has not been pushed down.

Monday, March 24, 2025

Hands on LLM training

Here are a collection of tips on tuning LLMs. A lot of these comments are taken from the Unsloth discord server.

What is Unsloth

Unsloth is a Python library that allows you to fine tune models without using silly amounts of compute. It does this by using smaller numeric precision.
"Almost everyone here is doing load_in_4bit. You load the model as a (dynamically) quantized 4bit version, then create an (uninitialized) bf16 QLoRA train said QLoRA merge the QLoRA onto the quantized model, generating an fp16 or bf16 model (up-quantized). [Optionally,] quantize it back down to your desired level" [Discord]
"LORAs are always fp16" [Discord]

AWQ is Activation-aware Weight Quantization "preserves a small fraction of the weights that are important for LLM performance to compress a model to 4-bits" HuggingFace.

Models

What size model can I fit into my GPU's VRAM?
A "safe bet would be to divide by 1.5 - 2x your total available memory (ram + vram) to get the B parameters vram / 1.5 or 2 to get the active params, so for example, I have 8gb vram i can load a 8b 4bit quantized model directly onto my gpu because of its relatively small approx 4-5gb vram footprint." [Discord]
This sentiment seems popular at the moment:

Mistral Small 2501

Hyperparameters

You want a rank "that is large enough to imitate the writing style and copy the knowledge from our instruction samples. You can increase this value to 64 or 128 if your results are underwhelming.

"The importance of the reference model is directly controlled via a beta parameter between 0 and 1. The reference model is ignored when beta is equal to 0, which means that the trained model can be very different from the SFT one. In practice, a value of 0.1 is the most popular one, but this can be tweaked" 
LLM Engineering Handbook

In the event that you're not "able to achieve consistently accurate answers to my questions... the model's responses are sometimes correct and sometimes incorrect, even for questions directly from the fine-tuning dataset", the advice is:
"You may want to up the rank and the alpha
Rank = how many weights it effects. Alpha is how strong they are effected. PEFT only effects the last few layers" [Discord]
(PEFT is Parameter Efficient Fine Tuning. LORA is just one of these methodologies.)
"Alpha should at least equal to the rank number, and rank should be bigger for smaller models/more complex datasets; it usually is between 4 and 64." [Discord]
Batchsize takes up a lot of VRAM so if you are having OOMs, choose smaller batches. This will mean training takes longer. To counter this, you can increase the of the gradient accumulation. This in effect batches the batches and writes back deltas to the matrix less often.

Data

Data makes or breaks an LLM.An old hand who goes by the name MrDragonFox on Discord has 
"Most problems are really in the data it self has to be prepped. 80% of the work is there" [Discord]
"You will need good data. ML 101: garbage in, garbage out. That is a science it it self." [Discord]
"Over 80% of the moat is in the data - once you have that - you can start with experimenting" [Discord]
It's a view echoed in LLM Engineer's Handbook: "In most use cases, creating an instruction dataset is the most difficult part of the fine-tuning process."

Cleaning the data is essential:
"Avoid abbreviations if you can. As a general rule, you shouldn't expect [a] model to understand and be able to do a task a human with the same context would be unable to accomplish" - etrotta, Discord
But quantity is not quality. In general, "if I have 30k samples (user-assistant), will finetuning a base or instruct model result in a strictly better model?"
"No. Fine tuning can worsen the model's performance if your data is bad or poorly formatted. Even if the data is good, it could cause the model to 'forget' things outside of the dataset you used to fine tune it (although that is a bit less likely with LoRA). Fine tuning will make the outputs closer to whatever you trained it on. For it to make a better model, "whatever you trained it on" must be better than what it's currently responding and follow the same structure you will use later." [Discord]
Overfitting and underfitting

What causes loss to make a staircase pattern when hitting a new epoch?"
"It's very normal, it's overfitting" [Discord]

"Underfitting is probably seen more often as a common phenomenon where a low rank model fails to generalize due to a lack of learnable params
"Generally speaking you are looking for a smooth descent of loss and a gradual convergence of around 0.5. Making data more diverse and novel compared to what the model has seen already is good to combat overfitting and reducing LR/epochs can help" [Discord]
Increasing rank and alpha should be included here.

Apparently this does not apply to RSLoRA (Rank Stabilisation LoRA) which addresses training instability and performance degradation issues that can arise in low-rank adaptation methods when fine-tuning on complex datasets..
"I think for RSLoRA, alpha is sqrt(hidden_size), not sqrt(r) as claimed in the blog post. You can have whatever LoRA size (rank) you want for GRPO, with or without RS. But note that for GRPO it's usually a good idea to have max_grad_norm=0.1 (or some other low value) because GRPO models tend to go insane if you overcook them. It's always either a=r or a=sqrt(hidden_size) if use_rslora=True"
If a fine tuned model doesn't zero shot a relatively simple question it was trained it on, you "might need more examples or deeper rank" [Discord]

Wednesday, March 19, 2025

Storage on Kubernetes Part 1

... is not easy. And it appears I'm not the only one to think so.
"Storage is hard. Fast, reliable storage is really hard" - Hubt on Discord
These are some notes I made while trying to set up a Kubernetes home lab on spare hardware.
 
K8s approach to Storage

Once more, K8s delegates to containers to manage storage.

"Applications that require persistent storage would fail to start because the cluster doesn’t yet have a storage driver. Like network drivers, several storage drivers are available for Kubernetes. The Container Storage Interface (CSI) provides the standard that storage drivers need to meet to work with Kubernetes. We’ll use Longhorn, a storage driver from Rancher; it’s easy to install and doesn’t require any underlying hard­ ware like extra block devices or access to cloud­based storage." [1] 

A prerequisite for Longhorn is that I need to run this on my boxes:

sudo apt install -y apt-transport-https open-iscsi nfs-common
sudo systemctl enable --now iscsid

Again, I need to allow all connections between my nodes, so time to fiddle with the firewall.

After installing Longhorn, running this reported that my replica count could not be satisfied:

kubectl -n longhorn-system logs -l app=longhorn-manager

with error "No available disk candidates to create a new replica of size 10737418240". Patching did not seem to help:

kubectl patch setting default-replica-count -n longhorn-system --type='merge' -p '{"value": "1"}'

Neither did:

kubectl edit storageclass longhorn

to edit the numberOfReplicas

(Note that Longhorn comes with a GUI that you can see if you port forward with:

kubectl port-forward -n longhorn-system svc/longhorn-frontend 8080:80

but this didn't help either).

So, instead, I downloaded the YAML, edited the numberOfReplicas by hand and deployed to the cluster.

Unfortunately, when I deleted my kafka and longhorn-service namespaces, the command would not terminated. It seemed that the kafka PVCs depended on the PVs that used Longhorn. 

Cyclic dependencies
I managed to finally kill the longhorn namespace that was constantly Terminating by manually deleting the PVs with kubectl edit and 

kubectl get namespace longhorn-system -o json > ns.json

deleting the finalizers in ns.json by hand and running:

kubectl replace --raw "/api/v1/namespaces/longhorn-system/finalize" -f ns.json

For the PVCs, I had to do the same things but since they depended on Longhorn webhooks, I needed to delete them first with:

kubectl get mutatingwebhookconfigurations
kubectl get validatingwebhookconfigurations
kubectl delete mutatingwebhookconfiguration <webhook-name>
kubectl delete validatingwebhookconfiguration <webhook-name>

Finally, 

kubectl get all -n <namespace>
kubectl get pvc -n <namespace>

indicated that everything had been cleaned up.

Phew! But now I'm back where I started and this was a lot of work (albeit a great way to understand Kubernetes). 

I then deployed Longhorn again only to have it complaining "failed to get backup target default: backuptarget.longhorn.io". Oh, boy.
"I like microk8s for having everything out of the box mostly running and turnable on with some little effort. Except metallb 😛" Darkwind on Discord
MicroK8s is a lightweight Kubernetes implementation that is great for CI/CD and (apparently) just works out-of-the-box. I might just install that...

[1] Book of Kubernetes