Wednesday, September 17, 2025

Configuring Polaris for Azure


Azure Config

To dispense tokens, you need to register what Azure calls an app. You do this with:

az ad app create \
  --display-name "MyMultiTenantApp" \
  --sign-in-audience "AzureADMultipleOrgs"

The sign-in-audience is an enum that determines if we're allowing single- or multi-tenant access (plus some Microsoft specific accounts).

Create the service principle with:

az ad sp create --id <appId>

where <appId> was spat out by the create command.

Finally, you need to assign roles to the app:

az role assignment create  --assignee <appId>  --role "Storage Blob Data Contributor" --scope /subscriptions/YOUR_SUBSCRIPTION/resourceGroups/YOUR_RESOURCE_GROUP

One last thing: you might need the credentials for this app so you can pass them to Polaris. You can do this with:

az ad app credential reset --id <appId>

and it will tell you the password. This is AZURE_CLIENT_SECRET (see below).

Polaris

First, Polaris needs these environment variables set to access Azure:

AZURE_TENANT_ID
AZURE_CLIENT_ID
AZURE_CLIENT_SECRET

Note that these are the credentials of the user connecting to Azure and not those of the principal that will dispense tokens. That is, they're your app's credentials not your personal ones.

Configuring the Catalog

The values perculiar to Azure that you need to add to the storageConfigInfo that is common to all cloud providers. These are:
  • tenantId. You can find this by running az account show --query tenantId.
  • multiTenantAppName. This is the Application (client) ID that was generated when the app was created. You can see it in Microsoft Entra ID -> App Registrations -> All Applications in the Azure portal or using the CLI: az ad app list, find your app with the name you created above and use its appId.
  • consentUrl. I'm not entirely sure what this is but can be generated with APPID=$(az ad app list --display-name "MyMultiTenantApp" --query "[0].appId" -o tsv) && echo "https://login.microsoftonline.com/common/oauth2/v2.0/authorize?client_id=$APPID&response_type=code&redirect_uri=http://localhost:3000/redirect&response_mode=query&scope=https://graph.microsoft.com/.default&state=12345"
Find out which url to use:

$ az storage account show --name odinconsultants --resource-group my_resource --query "{kind:kind, isHnsEnabled:isHnsEnabled}" -o table
Kind       IsHnsEnabled
---------  --------------
StorageV2  True

HNS stands for hierarchical nested structure.
For StorageV2 and HNS equal to True, use abfss in the allowedLocations part of the JSON sent to /api/management/v1/catalogs.

Testing the credentials

Check the validity of the SAS token with:

az storage blob list  --account-name odinconsultants  --container-name myblob --sas-token $SAS_TOKEN --output table

We get SAS_TOKEN by putting a break point in Iceberg's ADLSOutputStream constructor.

Iceberg bug?

Iceberg asserts that the key in the map of Azure properties sent by Polaris for the expiry time is the string is adls.sas-token-expires-at-ms.PROJECT_NAME. But Polaris is emitting the string expiration-time. I've raised a bug in the Iceberg project here (14069).

I also raised a concurrency bug here (14070) but closed it when I realised it had been fixed in the main branch even if it wasn't in the latest (1.9.2) release.

Anyway, the upshot is that my workaround is to mask the Iceberg code by putting this (almost identical) class first in my classpath.

Thursday, September 11, 2025

Configuring Polaris for GCP

After successfully configuring Polaris to vend AWS credentials, I've moved on to the Google Cloud Platform.

As far as the Spark client and Polaris are concerned, there's not much difference to AWS. The only major change is that upon creating the Catalog, you need to put in your JSON a gcsServiceAccount. This refers to the service account you need to create.

You create it with something like:

$ gcloud iam service-accounts create my-vendable-sa --display-name "Vendable Service Account"

and then add to it the user for whom it will deputize:

$ gcloud iam service-accounts add-iam-policy-binding my-vendable-sa@philltest.iam.gserviceaccount.com   --member="user:phillhenry@gmail.com"   --role="roles/iam.serviceAccountTokenCreator"

where philltest is the project and phillhenry is the account for which the token will proxy.

$ gcloud auth application-default login --impersonate-service-account=my-vendable-sa@philltest.iam.gserviceaccount.com

Note it will warn you to run something like gcloud auth application-default set-quota-project philltest or whatever your project is called.

This will open your browser. After you sign in, it will write a credential in JSON file (it will tell you where).  You must point the environment variable GOOGLE_APPLICATION_CREDENTIALS to this file before you run Polaris.

Note that it's the application-default switch that writes the credential as JSON so it can be used by your applications.

Not so fast...

Running my Spark code just barfed when it tried to write to GCS with:

my-vendable-sa@philltest.iam.gserviceaccount.com does not have serviceusage.services.use access

Note that if the error message mentions your user (in my case PhillHenry@gmail.com) not the service account, you've pointed GOOGLE_APPLICATION_CREDENTIALS at the wrong application_default_credentials.json file.

Basically, I could write to the bucket using the command line but not using the token. I captured the token by putting a breakpoint here in Iceberg's parsing of the REST response from Polaris. Using the token (BAD_ACCESS_TOKEN), I ran:

$ curl -H "Authorization: Bearer ${BAD_ACCESS_TOKEN}" https://storage.googleapis.com/storage/v1/b/odinconsultants_phtest
{
  "error": {
    "code": 401,
    "message": "Invalid Credentials",
    "errors": [
      {
        "message": "Invalid Credentials",
        "domain": "global",
        "reason": "authError",
        "locationType": "header",
        "location": "Authorization"
      }
    ]
  }
}

The mistake I was making was that the command line was associated with me (PhillHenry) not the service account - that's why I could upload on CLI. Check your CLOUDSDK_CONFIG environment variable to see which credential files you're using.

Seems that the service account must have the role to access the account

$ gcloud projects get-iam-policy philltest --flatten="bindings[].members"  --format="table(bindings.role)"  --filter="my-vendable-sa@philltest.iam.gserviceaccount.com"
ROLE
roles/serviceusage.serviceUsageAdmin
roles/storage.admin
roles/storage.objectAdmin
roles/storage.objectCreator

This appears to fix it because the service account must be able to see the project before it can see the project's buckets [SO].

Now, Spark is happily writing to GCS.

Configuring Polaris for AWS

Polaris can vend credentials from the top three cloud providers. That is, your Spark instance does not need to be granted access to the cloud provider as long as it can connect to the relevant Polaris catalog.

There are 3 steps in configuring vended credentials for Spark:
  1. Configure your cloud account such that it's happy handing out access tokens
  2. Configure Polaris, both the credentials to access Polaris and the Catalog that is essentially a proxy to the cloud provider
  3. Configure Spark's SparkConf.
Here's what you must do for AWS:

Permissioning Polaris to use AWS tokens

The key Action you need to vend tokens is sts:AssumeRole. If you were doing this in the AWS GUI, you'd go to IAM/Roles, select the Role that can read your S3 bucket, click on Trust relationships and grant the user (or better, a group) the right to access it with some JSON like:

        {
            "Sid": "Statement2",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::985050164616:user/henryp"
            },
            "Action": "sts:AssumeRole"
        }

This is only half the job. You then need a reciprical relationship for, in my case, user/henryp. Here, I go to IAM/Users, find my user and create an inlined entry in Permissions policies. This just needs the Statement

{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::985050164616:role/myrole"
}

Where myrole is the role that can read the bucket.

Ideally, this would all be automated in scripts rather than point-and-click but I'm at the exploratory stage at the moment.

What's going on in Polaris

Calling:

spark.sql(s"CREATE NAMESPACE IF NOT EXISTS $catalog.$namespace")

triggers these steps:

Spark's CatalogManager.load  initializes an Iceberg SparkCatalog that fetches a token via OAuth2Util.fetchToken via its HTTPClient.

Polaris's TokenRequestValidator will validateForClientCredentialsFlow and insist that the clientId and clientSecret are neither null nor empty

These values were taken from SparkConf's spark.sql.catalog.$catalog.credential setting after being split after OAuth2Util did its parseCredential in Iceberg - splitting ID and secret on the colon. It also ensures the scope and grantType is something Polaris recognises.

The upshot is that despite what some guides say, you don't want credential to be BEARER.

Configuring Spark

Calling:

spark.createDataFrame(data).writeTo(tableName).create()

does trigger credential vending as you can see by putting a breakpoint in AwsCredentialStorageIntegration.getSubscopedCreds. This invokes AWS's StsAuthSchemeInterceptor.trySelectAuthScheme and if you have configured AWS correctly, you'll be able to get some credentials from a cloud call.

This all comes from a call to IcebergRestCatalogApi.createTable.

Note that it's in DefaultAwsClientFactory where the s3FileIOProperties where the vended credentias live after being populated in the call to s3() in the Icebeg codebase.

After a lot of head scratching, this resource said my SparkConfig needed:

.set(s"spark.sql.catalog.$catalog.header.X-Iceberg-Access-Delegation", "vended-credentials")

This is defined here. The other options are remote signing where it seems Polaris will sign a credential and "unknown". But it's important for this to be set as only then will the table's credentials lookup path be used.

Thursday, September 4, 2025

Three things about Docker

Most of the time, Docker just works. But sometimes, you need to be a bit clever. In my case, I want Polaris to have he permission to write to the host filesystem, not just to see it. This proved hard. These are some lessons I learned.

What's in an image?

You can break down what is in a Docker image with something like:

docker save ph1ll1phenry/polaris_for_bdd:latest -o polaris_for_bdd.tar
mkdir polaris_fs
tar -xf polaris_for_bdd.tar -C polaris_fs

Then you can start untaring the blobs (where each blob is a Docker layer). In my case, I was trying to find where the bash binary was:

for BLOB in $(ls  blobs/sha256/ ) ; do { echo $BLOB ; tar -vxf blobs/sha256/$BLOB | grep bash;  } done

How was an image built?

You can reconstitute the steps made to create a Docker image with something like:

docker history --no-trunc apache/polaris

Restoring OS properties

The apache/polaris Docker image had a lot of extraneous Linux binaries removed, presumably to make it smaller and more secure. However, I needed them back as I need to grant the container certain permissions on the host. 

First off, the su command had been removed. You can canibalise binaries from other images in your Dockerfile like this:

FROM redhat/ubi9 AS donor
FROM apache/polaris AS final
...
COPY --from=donor /usr/bin/su /usr/bin/su

However, copying a binary over most of the time is a bit naive. Running su gave:

su: Critical error - immediate abort

Taking the parent Docker image before it was pruned, I could run:

[root@6230fb595115 ~]# ldd /usr/bin/su
linux-vdso.so.1 (0x00007fff87ae6000)
libpam.so.0 => /lib64/libpam.so.0 (0x0000733267a15000)
libpam_misc.so.0 => /lib64/libpam_misc.so.0 (0x0000733267a0f000)
libc.so.6 => /lib64/libc.so.6 (0x0000733267807000)
libaudit.so.1 => /lib64/libaudit.so.1 (0x00007332677d3000)
libeconf.so.0 => /lib64/libeconf.so.0 (0x00007332677c8000)
libm.so.6 => /lib64/libm.so.6 (0x00007332676ed000)
/lib64/ld-linux-x86-64.so.2 (0x0000733267a39000)
libcap-ng.so.0 => /lib64/libcap-ng.so.0 (0x00007332676e2000)

So, my Dockerfile had to COPY these files over too.

These libpam* shared objects refer to Linux's Pluggable Authentication Modules which is a centralized framework for permissioning arbitrary modules - eg MFA.

After a lot of faffing, I just COPYd the entire /etc/ folder from the donor to the final images. This is fine for integration tests but probably best avoided for prod :)

Saturday, August 30, 2025

(It's a) Kind and Strimzi

I have a Kafka cluster running on my laptop in Kind K8s using Strimzi. It just works [certain terms and conditions apply]!

For installing Strimzi etc, refer to a previous blog post of mine.

Then, I deployed kafka-with-dual-role-nodes.yaml with a few minor changes. 

First I changed the cluster name to kafka. Then, I wanted to use ephemeral disks as I didn't care about data loss in a PoC running on my PC:

-        type: persistent-claim
-        size: 100Gi
-        deleteClaim: false
+        type: ephemeral

But the main thing I had to do was create an external nodeport:

+      - name: external
+        port: 9094
+        type: nodeport   # 👈 important
+        tls: false

This meant I could see the service exposing the port to the host:

$ kubectl get svc -n kafka --output=wide
NAME                             TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE     SELECTOR
kafka-dual-role-0                NodePort    10.96.101.127   <none>        9094:31904/TCP                                 4m43s   statefulset.kubernetes.io/pod-name=kafka-dual-role-0,strimzi.io/cluster=kafka,strimzi.io/kind=Kafka,strimzi.io/name=kafka-kafka,strimzi.io/pool-name=dual-role
kafka-dual-role-1                NodePort    10.96.128.155   <none>        9094:30402/TCP                                 4m43s   statefulset.kubernetes.io/pod-name=kafka-dual-role-1,strimzi.io/cluster=kafka,strimzi.io/kind=Kafka,strimzi.io/name=kafka-kafka,strimzi.io/pool-name=dual-role
kafka-dual-role-2                NodePort    10.96.169.99    <none>        9094:31118/TCP                                 4m43s   statefulset.kubernetes.io/pod-name=kafka-dual-role-2,strimzi.io/cluster=kafka,strimzi.io/kind=Kafka,strimzi.io/name=kafka-kafka,strimzi.io/pool-name=dual-role

It's essentially port forwarding for me.

Note that one does not connect to the CLUSTER-IP. You need to see where these kafka-dual-role-* pods live:

$ kubectl get pods -n kafka --output=wide
NAME                                        READY   STATUS    RESTARTS      AGE   IP            NODE                 NOMINATED NODE   READINESS GATES
kafka-dual-role-0                           1/1     Running   0             22h   10.244.0.38   kind-control-plane   <none>           <none>
kafka-dual-role-1                           1/1     Running   0             22h   10.244.0.39   kind-control-plane   <none>           <none>
kafka-dual-role-2                           1/1     Running   0             22h   10.244.0.40   kind-control-plane   <none>           <none>

Ah, kind-control-plane. Which IP does that have?

$ kubectl get nodes --output=wide
NAME                 STATUS   ROLES           AGE    VERSION   INTERNAL-IP   EXTERNAL-IP   OS-IMAGE                         KERNEL-VERSION     CONTAINER-RUNTIME
kind-control-plane   Ready    control-plane   139d   v1.32.3   172.18.0.2    <none>        Debian GNU/Linux 12 (bookworm)   6.8.0-60-generic   containerd://2.0.3

$ ./kafka-topics.sh --bootstrap-server=172.18.0.2:31118 --list

$

(Which is expected as we haven't created any topics yet.)
"NodePort is a Kubernetes Service type designed to make Pods reachable from a port available on the host machine, the worker node.  The first thing to understand is that NodePort Services allow us to access a Pod running on a Kubernetes node, on a port of the node itself. After you expose Pods using the NodePort type Service, you’ll be able to reach the Pods by getting the IP address of the node and the port of the NodePort Service, such as <node_ip_address>:<node port>.  The port can be declared in your YAML declaration or can be randomly assigned by Kubernetes.  Most of the time, the NodePort Service is used as an entry point to your Kubernetes cluster." [The Kubernetes Bible]
So, the port from the svc and the IP address from the nodes.

Aside: one nice thing about Kind is that I can take my laptop to a coffee shop and join a new network and things carry on running despite my IP address changing. I don't think that is currently possible on the reference K8s.

One bad thing about Strimzi is that it barfs with this error when I upgraded the reference K8s implementation to 1.33. The cause was:

Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "emulationMajor" (class io.fabric8.kubernetes.client.VersionInfo), not marked as ignorable (9 known properties: "goVersion", "gitTreeState", "platform", "minor", "gitVersion", "gitCommit", "buildDate", "compiler", "major"])

 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 4, column: 22] (through reference chain: io.fabric8.kubernetes.client.VersionInfo["emulationMajor"])

at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61) ~[com.fasterxml.jackson.core.jackson-databind-2.16.2.jar:2.16.2]

...

at io.fabric8.kubernetes.client.utils.KubernetesSerialization.unmarshal(KubernetesSerialization.java:257) ~[io.fabric8.kubernetes-client-api-6.13.4.jar:?]


This ultimately stops the Strimzi operator. Looks like the Fabric8 library needs updating. The Strimzi version I'm using is:

$ helm pull strimzi/strimzi-kafka-operator --untar
$ helm template test-release ./strimzi-kafka-operator/ | grep "image:"
          image: quay.io/strimzi/operator:0.45.0


Monday, August 25, 2025

Cloud Architecture

This is still something I am mulling over but here are my notes.

Problem statement

We want different organisations to share large quantities of confidential data to be processed.

The prerequisites are:
  1. must be secure
  2. must be entirely FOSS based
  3. must be cross-cloud
  4. allows a bring-your-own policy
The choice of Apache Iceberg for the data seems to be straightforward. But the question of infra provisioning is not an easy one with a whole debate going on in Discord. Some love Terraform for being (somewhat) typesafe, others think controlllers are the way to go.

Infra provisioning

As ever, the answer to what route to take is "it depends" but here are some of the esoteric terms defined.

Crossplane is a CNCF-compliant, Golang "backend that enables you to build a control plane that can orchestrate applications and infrastructure no matter where they run". So, you could use Crossplane to provision infra not just in its K8s cluster but in the cloud. ACK (AWS Controllers for K8s) is an AWS specific equivalent of Crossplane that watches its CRDs and provisions accordingly.

In the other corner is the reigning champion, Terraform and it's FOSS fork, OpenTofu (both written in Go). Terraform has a form of type system but it's not enforced until the plan stage and is "loose" as it's not strict but allows type coercion. 

You can use CDKTF (which has common language bindings to create Terraform config files) but there is some doubt about its future.

Another tool to address the issues with raw Terraform (lack of DRY principles, ciruclar dependencies, orchestration etc) is Terragrunt, a thin wrapper around Terraform/OpenTofu written in Go. It allows the output from one stage to be the input to another [Discord]. Hashicorp, the creators of Terraform, have recognised these problems and have released Stacks.

A central way to orchestrate and deploy your Terraform config is the (mostly) Java Terrakube. It also adds an RBAC layer. Because everything runs remotely, it can be scheduled to detect drift, say, in the middle of the night.

Similarly, Atlantis is a Go server that has RBAC, executes the commands, requires approval for pull requests before applying them and generally implements a GitOps workflow.

For self-serve, there is the FOSS, Typescript Kubero. This allows developers to manage their own K8s deployments. Coolify, meanwhile, is a PHP tool that makes managing your own servers very cloud-like. It just needs an SSH connection.

Tuesday, August 12, 2025

BERT

I've been applying HuggingFace's ModernBert to a sequence of medical codes in the hope we can identify people who may have type 2 diabetes before even their doctors. The results have been pretty good and met expectations (the same as an American study; this is UK data). 

But what I'm surprised about is that a very simple approach performed as well as the more complicated one recommended by our American-based colleagues. Their recommendation was to use transfer learning on their model. This simply didn't work as their model used a medical encoding system called ICD10 when the British more commonly use the richer SNOMED. The theory was with fine tuning, the model would learn. It didn't.

As an aside, mapping one encoding to another is a non-trivial task. Avoid it if you can.

Instead, we pretrained BERT ourselves exclusively on our data using a Masked Language Model. This is where random words are omitted from a sequence and the model is asked to guess them. Having built the model, we then finetune it with the medical code sequence for those with and without T1D seeing if BERT can classify them.

Note that in this case, there is no pretrained BERT. We build it entirely from just our data. A sequence of medical codes is a very simple 'language' so you don't need massive compute resources to train a model. A single Tesla V100 with 16gb of VRAM was sufficient.

Although this achieved good results, it took literally days. The surprising things was that when we (perhaps naively) treated the sequence of codes as a sentence and used a plain BertForSequenceClassification object to train the model in just one step (no pretraining and finetuning) we got the same results.

Anyway, here are some notes I made along the way:

The neural net

Each of the heads defined by num_attention_heads focusses on different aspects of the text. More can better represent the data. Too many can cause overfitting. Its default is 12.

num_hidden_layers impacts accuracy and the model's ability to generalize. Its default is 12. Recommended values depend on use case. Less than 6 is good for resource-constrained environments; 6-12 for fine-tuning; 12-24 for large pretraining.

ChatGPT suggests for BERT classification, final loss should be 0.2-0.4. "BERT can overfit fast, especially with small datasets."

per_device_train_batch_size is purely a runtime optimization.

call model.eval() and put all access to the model in a with torch.no_grad() block.

Note that BERT does not evaluate model improvement by being told the labels. It just uses the labels to calculate the validation loss.

Pretraining

It's important to note that when pretraining BERT for classification, it's not told what the answers are. Instead, it's fed token sequences where some are masked and it tries to guess what the missing tokens can be. This is the accuracy metric it spits out every so often.

Finetuning

When BERT is training, you'll see Training and Validation loss. Of the two, validation is the most important. Training loss is telling you how well the model deals with data it has already seen. The model is not trained on the evaluation set so Validation indicates how well it will fare with data it has not been trained on.  Consequently, a falling training loss and a constant validation loss indicates that the model is overfitting. This can be addressed with dropout and/or weight_decay.

The conventional way of training BERT is to use BertForMaskedLM then use BertForSequenceClassification for fine tuning. The first takes sentences from the training data, randomly blanks out some words and guesses what they might be. However, we found that for small data sets that are not natural language (sequences of medical codes), we could get good performance by just using BertForSequenceClassification which does all the back propagation in a fraction of the time.

When calling BertForSequenceClassification.from_pretrained you might see:

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at ... and are newly initialized: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.

Apparently, this is OK. If you've been given a model that you wish to then use for classification. The classifier.* is the last layer of the neural net that takes a model that's been trained (possibly by masking but not classification) to which we will add a classification layer.

The special CLS token represents a summary of the string that's being encoded. The bert.pooler.* is a single, final, dense layer applied to this token that's used in classification (is not used at all in masking).

The Tokenizer(s)

Typically, you'll see tokenizers from both the Tokenizers and Tranformers libraries. They're both from HuggingFace so it's not surprising that you can tokenize with one and save it and then load the data with a tokenizer from the other.

Note that the tokenizers do not save the words as vectors. This embedding is done in the neural net. Instead, they offer different approaches  - for instance, balancing a vocabulary that can cover words it's not seen yet with having a vocab that's too big and makes the training inefficient. 

In the Transfomers library, you'll see some tokenizer names appended with Fast. These really are much faster to run as they're Rust implementations.

Almost as an aside, we tried to build our own tokenizer vocabulary by hand as we wanted to stuck as closely to the methodology used by our American colleagues as we could. This is perfectly possible (just point tokenizers.models.WordPiece.from_file at your vocab.txt file) but we found when we prepended all the codes with their classification system in both vocab.txt and the actual data we were tokenizing, accuracy was an unrealistically high 0.999 from the get-go. Why this was the case remains a mystery. Needless to say, we backed out that change. [Addendum: it's still a mystery why but using just the 30k most common whole words as tokens rather than >100k previously resulted in good accuracy].

PyTorch

Backing the Transformer's implementation of BERT if PyTorch. Browsing through the code you'll see references to unsqueeze. This is an interesting fella that can best be described by this StackOverflow answer. Basically, it can take a 2-d matrix and turn it into a 3-d tensor with the argument dictating whether the elements are in the x-, y- or z-plane. From a programming point of view, you'll see that the matrice's elements become nested one layer deeper in more square brackets, the exact configuration of those brackets depend upon which plane the elements have been unsqueezed into.

This diagram from the SO answer is great:


.

Sunday, August 3, 2025

Iceberg Distributions

There are natural but opposing forces at work in writing data with Iceberg/Spark. Writing many files is more efficient at processing time but leads to greater admin costs - in the extreme, they can cause OOMEs when handling the table's metadata.

The user can fiddle with a few knobs to mitigate this. One is write.distribution-mode. Here are some tests I created to see how the configuration changes affect the number of files when writing the same data:
 
write.distribution-mode Number of files Notes
"hash"  p df.writeTo(tableName).append()
"hash", sorted DataFrame  p ...TBLPROPERTIES ('sort-order' = 'partitionField ASC NULLS FIRST'...
"hash", sorted table  p df.sort("partitionField").writeTo(tableName).append()
"hash", sorted table but only one value for partitionField  1  because p=1; assumes the size of the data to write is < write.spark.advisory-partition-size-bytes. Otherwise multiple files are written (Spark 3.5).
"none"  d * p df.writeTo(tableName).append()
"none", sorted DataFrame  p df.sort("partitionField").writeTo(tableName).append()
"none", sorted table  d * p ...TBLPROPERTIES ('sort-order' = 'partitionField ASC NULLS FIRST'...
"none", sorted table but only one value for partitionField  d  because p=1

p = number of (logical) partitions in the data
d = number of (physical) partitions in the data frame

Note that this is for Spark 3.5. For a distribution mode of hash, and with the size of data exceeding advisory-partition-size-bytes, multiple threads write multiple files.

But for Spark 3.3, if we use a distribution mode of hash and the data exceeds the size of write.spark.advisory-partition-size-bytes, then only one thread writes.

Fan out made no difference in my tests that measured the number of files but it should be used, despite what the documentation says. Contrary to the documentation, Russell Spitzer on Discord says:
"Fanout writer is better in all cases.  We were silly.  The memory requirements were tiny IMHO.  Without fanout, you need to presort within the task but that ends up being way more expensive (and memory intesive) IMHO.  In the latest versions @Anton Okolnychyi removed the local sort requirements if fanout is enabled, so I would recommend fanout always be enabled and especially if you are using distribution mode is none." 

Sunday, June 15, 2025

Lessons from a migration

Don't use low/no-code. It really is a false economy. It appeals to managers during sales pitches because it looks so simple. But your team will spend all their time and all your money debugging in a most un-ergonimc manner. And as for testing....

Azure Data Factory is a no-code solution that's great for simple DAGs. However, it is not suitable for anything more entrerprise-y. For instance, if you're copying large amounts of data from one database to another and there is a failure, there is no scope for cleaning up the data. The first you'll know of it (because nobody ever reads the logs) is when the users are complaining that the numbers coming out of their SQL queries are wrong.  

ADF doesn't even allow nested ForEach loops! It stores pipelines as JSON so watch it get confused when the pipeline itself contains JSON!

Don't give people raw SQL access. They can make profound changes and you'll have no logs and little ability to correct it.

Everything needs to have an automated audit log. There's going to be large number of questions like: "why did it do this?" It's never simply a matter of success or fail. There is huge nuance - eg, type conversion may mean what was in the source system is not exactly the same as in the destination system. Is this a pass or a fail?

Processes need orchestrating. One process reads a dataset while another deletes it/writes to it causing nondeterministic errors. You get similar issues when you issue a cancel.

Communication - docs are always changing. Be aware that most knowledge is tribal, not recorded.

Scheduling: everything was based on time. So, it's possible two daily jobs were running at the same time if the first took more than 24 hours. Data migrating from one DB to another within a cloud zone and subscription was at a rate of ~10mb/s. This meant some tables took hours. And it didn't scale. As the project progressed, more tables were to be migrated. But this caused some jobs to take days. So, the weekly transfer was still going on when people came to the office on Monday morning. Consequently, their queries were returning inconsistent results - the silent killer.

The metadata must be constrained. If it lives in a database, it must have referential integrity. This can be overlooked because it's not business data. But if you want to eliminate mysterious errors, it's essential to get this right.

Like water finding its natural level, people will naturally gravitate to the path of least resistance. As a general rule, teams do not follow best practises, naming conventions or industry standards. This is not strictly true but it's so common that you must assume for all teams that their data and code is a Rube Goldberg Machine.

Regular meetings with major stakeholders. These meeting can be brief (about 15 minutes is OK once the cadence is established) but they do need to be frequent (at least twice a week, preferably daily).

A Quick look at Quarkus


Microservice Framework

Quarkus depends a lot on SmallRye "a set of implementations of the MicroProfile specifications. We said that Quarkus is also a MicroProfile implementation, so this begs for a bit of explanation. Each SmallRye project implements one of the MicroProfile specifications" [1].

The MicroProfile specification is a set of standards for building microservice containers - for instance, handling Kubernetes health checks etc.

Native code

GraalVM has two main features: 
  • it allows polyglot development
  • it allows Ahead-of-Time (AOT) compilation.
This latter features is what Quarkus is interested in, although "Mandrel is a downstream (forked) distribution of the Oracle GraalVM CE with the main goal of providing a way to build a native executable specifically designed to support Quarkus... Mandrel focuses mainly on the native-image build tool. It doesn’t provide a full GraalVM toolset." [1]

Generate the binary with:

mvn package -Pnative

This defers to GRAALVM_HOME/bin/native-image and gcc

If we delegate to Graal, why use Quarkus at all? Well, Graal struggles with reflection. Quarkus provides shims that mean popular frameworks don't use reflection and can therefore be turned to native code.

Note that just because it produces an executable binary, the artifact still needs a garbage collector. In the output, you'll see:

Garbage collector: Serial GC (max heap size: 80% of RAM)

You don't need to use Quarkus to build native executables, though. For instance, if you tried to convert the Java port of Llama.cpp, you can build it with just make native - although note that I had to use version 24 of GraalVM as earlier versions didn't like the use of AVX (vector extensions).

Thread management

Much like Scala's Cats and ZIO, Quarkus facilitates execution engines. For instance it has a notion of blocking threads. "The @Blocking annotation on both methods tells Quarkus that the method executes a blocking operation (persist or delete) on a database, so it needs to execute on worker thread, which allows blocking." [1]
PolarisPrincipalAuthenticatorFilter

[1] Quarkus in Action (Manning)

Making ML projects more robust

The software industry has made great strides to make the engineering process more robust. Despite being an adjacent disciple, data science is still in the dark ages.

You can unit test any code so there is no excuse for not writing them. However, a lot of data science tools are not ergonomic in this regard. Notebooks are great for EDA, less so for robust software practises.

So, although they are not a replacement for automated tests, here are some tips if you're stuck in the world of notebooks.

Observability

If your platform does not support it out of the box, add it manually. For instance, in one project, there was one table that another 5 joined to at some point downstream in the pipeline. That first table was run several times with different parameters. This lead to the potential problem that the 6 tables in total may be out of synch with each other. A solution was to add a runtime date in the first table and all the other 5 preserve this value when they are generated. The Python code that then uses this data asserts that the date is consistent across all tables. 

Plot graphs of important features

In a bioinformatics project, the age of a patient was calculated in Python as the time between a patient's date of birth and the start of the observation period. This was reasonable. But several weeks later, the client asked for all of a patients medical details to be used in the study. Consequently, the observation period - calculated in SQL - effectively became a patient's first contact with the medical services. 

For most people, this happens in the first year of life. For them, their age became zero. This wasn't universally true. Imigrants would have their first medical intervention later in life. So, somebody browsing the data might think it odd there was so many infants in their sample but assume that it was just a statistical fluctuation as there were clearly 30 and 40 year olds in there too. 

However, when you plotted the age after the SQL code change, the graph of the population's age looked like this:

A bug in calculating age

Without an end-to-end suite of tests, nobody thought of updating the age calculation logic.

Sampling 

The moment you add constraints to sampled data, you must beware of some statistical oddities. 

We wanted to assign a cutoff date to the negative cohort randomly sampled from the date from the positive. Sounds straightforward, right? Well, then you have to add a constraint that the cutoff date only made sense if it came after a patient's date of birth. But this very reasonable constraint skews the data. The reason being that somebody born in 2000 can be given any cutoff date 2000 and 2025 giving them an age at cutoff of 0 to 25 years old. After sampling, we might give them a cutoff date of 2003 and consequently an age of 3. 

A bug in sampling

However, somebody born in 2022 can have a maximum age of 3 after sampling. Consequently, we have a situation where the 25-year old born in 2000 can contribute to the 3-year old bucket after sampling, but the 3-year old born in 2022 cannot contribute to the 25-year old bucket. Hence, you get far more 3-year olds than are really in the data.

Saturday, May 31, 2025

Iceberg and Kafka Connect

I've been looking at Iceberg ticket 11818 "Iceberg Kafka Connector experiences a constant hanging lag for low-volume topics".

Quick background: Kafka Connect is part of the Kafka project is a framework for "getting data from an application into Kafka or getting data out of Kafka into an external application." [Kafka Streams in Action, Manning]

Kafka Connect

I'm trying to get my head around Iceberg code that uses Kafka Connect.

SinkTask is the class that must be implemented and put is the method that receives a Collection of SinkRecords.

This SO answer describing the relationship between connectors and tasks was informative:
"A task is a thread that performs the actual sourcing or sinking of data.  
The number of tasks per connector is determined by the implementation of the connector...  
For sink connectors, the number of tasks should be equal to the number of partitions of the topic.  The task distribution among workers is determined by task rebalance which is a very similar process to Kafka consumer group rebalance."

Iceberg and Kafka Connect

Iceberg's Kafka Connect implementation can be configured to create the table if it does not exist. In the Integration*Test classes, you can see the Kafka Connect is configured such that iceberg.tables.route-field (TABLES_ROUTE_FIELD_PROP) is payload, which is a field in our test's TestEvent objects, messages that are serialized to JSON and sent to Kafka.

The first message will create a RecordWriter. If the IcebergWriterFactory is so configured, it will first create the table to which the RecordWriter will write.

Inspecting Kafka

When running kafka-consumer-groups.sh you can see CURRENT-OFFSET (the offset of the next record the consumer is to read) and LOG-END-OFFSET (the offset of the next record a producer is to write). The LAG is the difference between the two.

Seems that if receivedPartitionCount >= expectedPartitionCount is not true (see CommitState.isCommitReady), Coordinator.receive will not commit and set CommitState.currentCommitId to null. This means a StartCommit is never sent. 

Incidentally, I asked on the Iceberg Slack channel what the rationale was behind this but received no replies after 4 days.

This results in the Kafka Connect logging:

[2025-05-27 09:24:22,527] INFO Commit d9f18c4a-5253-4c55-8cce-b3a077bbf3c9 initiated (org.apache.iceberg.connect.channel.Coordinator)

so the commit is initiated but:

[2025-05-27 09:24:23,950] INFO Commit d9f18c4a-5253-4c55-8cce-b3a077bbf3c9 not ready, received responses for 1 of 2 partitions, waiting for more (org.apache.iceberg.connect.channel.CommitState)

Evidently, receivedPartitionCount is 1 and expectedPartitionCount is 2 (see below).

The problem

The line here in CommitState.isCommitReady:

if (receivedPartitionCount >= expectedPartitionCount) {

will lead to isCommitReady returning false if this inequality does not hold. Before we look at the values, let's first see how the leader is chosen.

Follow my leader

The algorithm to elect the leader is this:
"there should only be one task assigned partition 0 of the first topic, so elect that one the leader" [Commiter in Iceberg code]. 
It does this by taking "the list of partitions that are now assigned to the [Sink] task " [Kafka SinkTask] when it is opened. It then compares this to the members of the consumer group corresponding its ID we have. We call Kafka [docs] directly to get this.

expectedPartitionCount

The "list of the members of the consumer group" [Kafka ConsumerGroupDescription.members] becomes CommitterImpl.membersWhenWorkerIsCoordinator iff the lowest partition ID in for the members happens to be one of the list of partitions we've been given to coordinate via SinkTask.open (see the leadership election logic, above).

That is, if we're the leader, all those partitions are ours.

The Iceberg Coordinator is instantiated with this list and its totalPartitionCount is calculated as the sum of all the partitions for its members. 

Great, this is exactly the same as expectedPartitionCount in the code snippet above.

receivedPartitionCount

We take the batch of DataComplete objects the CommitState has been buffering. A DataComplete is a "control event payload for events sent by a worker that indicates it has finished sending all data for a commit request."

We sum the assignments for each Payload.

These assignments are from "the current set of assigned TopicPartitions for this task" [Kafka SinkTaskContext docs], that is, the worker that created the DataComplete objects. 

Note that it will "include all assigned topic partitions even if no messages were read" [Iceberg Worker code].

This sum of all these partitions for the messages with our currentCommitId is what we're after.

The problem here is we have a topic with 2 partitions and 2 tasks so each task has a single partition. Therefore, receivedPartitionCount = 1.

The Solution

If the tasks.max for this IcebergSinkConnector is 2, then the partitions will be shared and this inequality does not hold for a single message. But if it's set to 1, the single SinkConnector will deal with all partitions.

This leads to this log line:

[2025-05-27 11:41:21,712] INFO Commit 8dfe1850-d8b6-4054-9d0c-52a782a1d9e4 ready, received responses for all 2 partitions (org.apache.iceberg.connect.channel.CommitState)

Evidently, receivedPartitionCount = 2 and expectedPartitionCount <= 2. 

Saturday, May 17, 2025

DevOps lessons from Polaris and Iceberg

The Apache Iceberg and Polaris code bases make pretty good reading. There's some nice DevOps work here. Here are a few tidbits.

Iceberg's Kafka Connectors

As part of Iceberg's integration tests, Docker compose is used to fire up a Kafka Connect container. Interestingly, this container mounts the directory holding Iceberg's artifacts so it instantly has the latest implementation of Iceberg's Kafka Connect's SinkConnector and SinkTask. The test suite then starts the connector with a REST call (that's the nature of Kafka Connect) that contains all of the connector's config.

This docker-compose.yml also starts a MinIO container so Kafka Connects thinks it's writing to an AWS S3 bucket - all of this in on one self-contained laptop, no config required. (It uses TestContainers to do the heavy lifting of talking to the Docker daemon).

TestContainers has a very cool trick for cleaning up the containers when the tests are finished. It starts a sidecar called Ryuk that listens on a port connected to the JVM and kills the containers when that connection closes. You can see it while the tests are running with:

$ docker ps
CONTAINER ID   IMAGE                                 COMMAND                  CREATED         STATUS                            PORTS                                                           NAMES
...
f7c69b891330   testcontainers/ryuk:0.11.0            "/bin/ryuk"              5 seconds ago   Up 5 seconds                      0.0.0.0:32772->8080/tcp, :::32772->8080/tcp                     testcontainers-ryuk-3c88ed17-ec3d-4ce9-8830-dbbc1ca86294

Kind - K8s lite

You can run Polaris in a Kind cluster. Kind is Kubernetes in Docker. Although it is not CNCF compliant itself, it runs a CNCF compliant version of Kubernetes, just inside Docker containers. So, although it is not appropriate for a multi-node environment, Kind is great for development on a single machine.

Kind starts two Docker containers (kind-control-plane and kind-registry) as well as updating your ~/.kube/config file. If you ./run.sh in the Polaris codebase, you will see it start Kubernetes pods like etcdcore-dns and kube-proxy as well as a polaris container.

Metadata file

Iceberg creates a file called iceberg-build.properties when it's built. Having your project do this can be enormously useful when you've ssh-ed into a box and wondering exactly what version is running there because it's a test environment and nobody has keeping tracking of what is deployed where (ie, the real world). 

Iceberg is built with Gradle so uses the com.gorylenko.gradle-git-properties plugin but there appears to be an equivalent for Maven (git-commit-id-plugin).

Quarkus

Polaris has become heavily dependent on Quarkus. The Docker container just runs quarkus-run.jar. This Jar's main class is io.quarkus.bootstrap.runner.QuarkusEntryPoint. This loads /deployments/quarkus/quarkus-application.dat, a binary file that loads all the Polaris guff. Apparently, it's a binary file to minimise start up times.

The Docker image's entrypoint is /opt/jboss/container/java/run/run-java.sh. This script comes FROM the Dockerfile's Quarkus-friendly base image and contains sensible JVM defaults.

Wednesday, May 14, 2025

Interpreting XGBoost models

Playing with XGBoost at the moment. Here are some basic notes:

Scikitlearn's XGBoost Classifier

This takes a lot of parameters but the highlights are:

Also called eta, the learning_rate is similar to neural nets in that a smaller value means to slower but potentially better performance during training.

At what value of minimum loss before splitting a leaf is called gamma. A higher value means a more conservative model.

You can define the loss function with eval_metric. A value of logloss for instance will penalize confidently wrong predictions.

The fraction of the training data used per tree is the subsample argument and the scale_pos_weight weights the positive classes (useful for imbalanced data).

Random Stratified Data

Talking of imbalanced data, if there are two classes, A and B, and the number of data points for A is small, a random sample may randomly have a disproportionate number of As. To address this, we separate the A and B data points and take the exact number from each such that the ratio overall fits the real world.

For instance, if the ratio of A to B is 1:100 and we want a sample of 1000 points, using Random Stratified Data will give us precisely 10 As. Whereas a random sample could (feasibly) give us none.

Notes on the Confusion Matrix

High precision and low recall indicates a model that really wants to be sure it doesn't have too many false positives.

F1 scores are good for checking imbalanced data or false positives and negatives are of the same value. It's domain specific but a value of about 0.6 is a minium for being acceptable.

Specificity is how well a binary classifier spots false negatives. It's between 0 and 1 and higher is better. Sensitivity is the equivalent for false positives.

The Bayes factor is the ratio of the probability of getting this data given a hypothesis is true versus the null hypothesis, that is P(D|H1)/P(D|H0).

When less (data) is more

In survival analysis, you might want to employ a technique called temporaral censoring analysis. This applies to time-to-event data, the subject of survival analysis. The idea is that you censor data to minimise a fundamental problem: a person who yet to be diagnosed with a disease (for example) is classed the same as somebody who will never have it.

A similar approach is used to tackle Simpson's Paradox where the model behaves differently in aggregate than when the data is segmented. Here, we segment the data and evaluate the model on those cohorts only. These techniques are called cohort-based validation or segmented model evaluation depending on how you slice and dice the data.

Calibration

If you were to plot the probabilities of your model (from 0 to 1) against the actual fraction it of class A vs class B at that probability, you'd have a calibration curve. There are a few variants on the methodology (eg, Platt Scaling that treats this as a logisitic regression) but the simpler one is an isotonic method. This sorts your probabilities in ascending order while calculating the cumumlative total of class As monotonically isotonically increasing step function.

A line that hugs the diagonal indicates a well calibrated model.

Wednesday, April 16, 2025

Kafka in Kubernetes

If you ask Strimzi to set up a Kafka cluster out of the box, you'll see this when connecting:

$ kafka-topics.sh --bootstrap-server 10.152.183.163:9092 --list
...
[2025-04-04 16:35:59,464] WARN [AdminClient clientId=adminclient-1] Error connecting to node my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc
...

Where that IP address comes from:

$ kubectl get service -A
NAMESPACE     NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE...
kafka         my-cluster-kafka-bootstrap            ClusterIP   10.152.183.163   <none>        9091/TCP,9092/TCP,9093/TCP                     21d

It appears that Strimzi does not expose external Kafka ports by default. So, add an external port with:

kubectl get kafka my-cluster -n kafka -o yaml > /tmp/kafka.yaml

then edit /tmp/kafka.yaml adding:

    - name: external
      port: 32092
      tls: false
      type: nodeport

in the spec/kafka/listeners block and apply it with:

kubectl apply -n kafka -f /tmp/kafka.yaml

Now I can see:

$ kubectl get svc -n kafka
NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE   SELECTOR
...
my-cluster-kafka-bootstrap            ClusterIP   10.152.183.163   <none>        9091/TCP,9092/TCP,9093/TCP                     21d
my-cluster-kafka-external-bootstrap   NodePort    10.152.183.27    <none>        32092:32039/TCP                                11d

It appears that Strimzi has created a new service for us - hurrah! 

However, making a call to Kafka still fails. And this is because of the very architecture of Kubernetes. I am indeed communicating with a Kafka broker within Kubernetes but then it's forwarding me to another domain name, my-cluster-dual-role-0.my-cluster-kafka-brokers.kafka.svc. The host knows nothing about this Kubernetes domain name. Incidentally, the same happens in Kafka for a pure Docker configuration.

Kubernetes pods resolve their domain names using the internal DNS.

$ kubectl exec -it my-cluster-dual-role-1 -n kafka -- cat /etc/resolv.conf
Defaulted container "kafka" out of: kafka, kafka-init (init)
search kafka.svc.cluster.local svc.cluster.local cluster.local home
nameserver 10.152.183.10
options ndots:5

This nameserver is kube-dns (I'm using Microk8s):

$ kubectl get svc -n kube-system
NAME       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                  AGE
kube-dns   ClusterIP   10.152.183.10   <none>        53/UDP,53/TCP,9153/TCP   21d

and we can query it from the host:

$ nslookup my-cluster-dual-role-external-0.kafka.svc.cluster.local 10.152.183.10
Server: 10.152.183.10
Address: 10.152.183.10#53

Name: my-cluster-dual-role-external-0.kafka.svc.cluster.local
Address: 10.152.183.17

Now, to get the host to use the Kubernetes DNS for K8s domain names, I had to:

$ sudo apt update
$ sudo apt install dnsmasq
$ sudo vi /etc/dnsmasq.d/k8s.conf

This was a new file and needed:

# Don't clash with systemd-resolved which listens on loopback address 127.0.0.53:
listen-address=127.0.0.1
bind-interfaces
# Rewrite .svc to .svc.cluster.local
address=/.svc/10.152.183.10
server=/svc.cluster.local/10.152.183.10

That listen-address line was because sudo ss -ulpn | grep :53 showed both dnsmasq and systemd-resolved were fighting over the same port.

I also had to add:

[Resolve]
DNS=127.0.0.1
FallbackDNS=8.8.8.8
Domains=~svc.cluster.local

to /etc/systemd/resolved.conf to tell it to defer to dnsMasq first for domains ending with svc.cluster.local. Finally, restarting 

$ sudo systemctl restart systemd-resolved
$ sudo ln -sf /run/systemd/resolve/resolv.conf /etc/resolv.conf
$ sudo systemctl restart dnsmasq

Now let's use that external port we configured at the top of the post:

$ kubectl get svc -n kafka
NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE
...
my-cluster-kafka-external-bootstrap   NodePort    10.152.183.27    <none>        32092:32039/TCP          
$ ./kafka-topics.sh --bootstrap-server 10.152.183.27:32092 --create --topic my-new-topic  --partitions 3  --replication-factor 2
Created topic my-new-topic.
$ ./kafka-topics.sh --bootstrap-server 10.152.183.27:32092 --list
my-new-topic

Banzai!