Saturday, May 31, 2025

Iceberg and Kafka Connect

I've been looking at Iceberg ticket 11818 "Iceberg Kafka Connector experiences a constant hanging lag for low-volume topics".

Quick background: Kafka Connect is part of the Kafka project is a framework for "getting data from an application into Kafka or getting data out of Kafka into an
external application." [Kafka Streams in Action, Manning]

Kafka Connect

I'm trying to get my head around Iceberg code that uses Kafka Connect.

SinkTask is the class that must be implemented and put is the method that receives a Collection of SinkRecords.

This SO answer describing the relationship between connectors and tasks was informative:
"A task is a thread that performs the actual sourcing or sinking of data.  
The number of tasks per connector is determined by the implementation of the connector...  
For sink connectors, the number of tasks should be equal to the number of partitions of the topic.  The task distribution among workers is determined by task rebalance which is a very similar process to Kafka consumer group rebalance."

Iceberg and Kafka Connect

Iceberg's Kafka Connect implementation can be configured to create the table if it does not exist. In the Integration*Test classes, you can see the Kafka Connect is configured such that iceberg.tables.route-field (TABLES_ROUTE_FIELD_PROP) is payload, which is a field in our test's TestEvent objects, messages that are serialized to JSON and sent to Kafka.

The first message will create a RecordWriter. If the IcebergWriterFactory is so configured, it will first create the table to which the RecordWriter will write.

Inspecting Kafka

When running kafka-consumer-groups.sh you can see CURRENT-OFFSET (the offset of the next record the consumer is to read) and LOG-END-OFFSET (the offset of the next record a producer is to write). The LAG is the difference between the two.

Seems that if receivedPartitionCount >= expectedPartitionCount is not true (see CommitState.isCommitReady), Coordinator.receive will not commit and set CommitState.currentCommitId to null. This means a StartCommit is never sent. 

Incidentally, I asked on the Iceberg Slack channel what the rationale was behind this but received no replies after 4 days.

This results in the Kafka Connect logging:

[2025-05-27 09:24:22,527] INFO Commit d9f18c4a-5253-4c55-8cce-b3a077bbf3c9 initiated (org.apache.iceberg.connect.channel.Coordinator)

so the commit is initiated but:

[2025-05-27 09:24:23,950] INFO Commit d9f18c4a-5253-4c55-8cce-b3a077bbf3c9 not ready, received responses for 1 of 2 partitions, waiting for more (org.apache.iceberg.connect.channel.CommitState)

Evidently, receivedPartitionCount is 1 and expectedPartitionCount is 2 (see below).

The problem

The line here in CommitState.isCommitReady:

if (receivedPartitionCount >= expectedPartitionCount) {

will lead to isCommitReady returning false if this inequality does not hold. Before we look at the values, let's first see how the leader is chosen.

Follow my leader

The algorithm to elect the leader is this:
"there should only be one task assigned partition 0 of the first topic, so elect that one the leader" [Commiter in Iceberg code]. 
It does this by taking "the list of partitions that are now assigned to the [Sink] task " [Kafka SinkTask] when it is opened. It then compares this to the members of the consumer group corresponding its ID we have. We call Kafka [docs] directly to get this.

expectedPartitionCount

The "list of the members of the consumer group" [Kafka ConsumerGroupDescription.members] becomes CommitterImpl.membersWhenWorkerIsCoordinator iff the lowest partition ID in for the members happens to be one of the list of partitions we've been given to coordinate via SinkTask.open (see the leadership election logic, above).

That is, if we're the leader, all those partitions are ours.

The Iceberg Coordinator is instantiated with this list and its totalPartitionCount is calculated as the sum of all the partitions for its members. 

Great, this is exactly the same as expectedPartitionCount in the code snippet above.

receivedPartitionCount

We take the batch of DataComplete objects the CommitState has been buffering. A DataComplete is a "control event payload for events sent by a worker that indicates it has finished sending all data for a commit request."

We sum the assignments for each Payload.

These assignments are from "the current set of assigned TopicPartitions for this task" [Kafka SinkTaskContext docs], that is, the worker that created the DataComplete objects. 

Note that it will "include all assigned topic partitions even if no messages were read" [Iceberg Worker code].

This sum of all these partitions for the messages with our currentCommitId is what we're after.

The problem here is we have a topic with 2 partitions and 2 tasks so each task has a single partition. Therefore, receivedPartitionCount = 1.

The Solution

If the tasks.max for this IcebergSinkConnector is 2, then the partitions will be shared and this inequality does not hold for a single message. But if it's set to 1, the single SinkConnector will deal with all partitions.

This leads to this log line:

[2025-05-27 11:41:21,712] INFO Commit 8dfe1850-d8b6-4054-9d0c-52a782a1d9e4 ready, received responses for all 2 partitions (org.apache.iceberg.connect.channel.CommitState)

Evidently, receivedPartitionCount = 2 and expectedPartitionCount <= 2. 

Saturday, May 17, 2025

DevOps lessons from Polaris and Iceberg

The Apache Iceberg and Polaris code bases make pretty good reading. There's some nice DevOps work here. Here are a few tidbits.

Iceberg's Kafka Connectors

As part of Iceberg's integration tests, Docker compose is used to fire up a Kafka Connect container. Interestingly, this container mounts the directory holding Iceberg's artifacts so it instantly has the latest implementation of Iceberg's Kafka Connect's SinkConnector and SinkTask. The test suite then starts the connector with a REST call (that's the nature of Kafka Connect) that contains all of the connector's config.

This docker-compose.yml also starts a MinIO container so Kafka Connects thinks it's writing to an AWS S3 bucket - all of this in on one self-contained laptop, no config required. (It uses TestContainers to do the heavy lifting of talking to the Docker daemon).

TestContainers has a very cool trick for cleaning up the containers when the tests are finished. It starts a sidecar called Ryuk that listens on a port connected to the JVM and kills the containers when that connection closes. You can see it while the tests are running with:

$ docker ps
CONTAINER ID   IMAGE                                 COMMAND                  CREATED         STATUS                            PORTS                                                           NAMES
...
f7c69b891330   testcontainers/ryuk:0.11.0            "/bin/ryuk"              5 seconds ago   Up 5 seconds                      0.0.0.0:32772->8080/tcp, :::32772->8080/tcp                     testcontainers-ryuk-3c88ed17-ec3d-4ce9-8830-dbbc1ca86294

Kind - K8s lite

You can run Polaris in a Kind cluster. Kind is Kubernetes in Docker. Although it is not CNCF compliant itself, it runs a CNCF compliant version of Kubernetes, just inside Docker containers. So, although it is not appropriate for a multi-node environment, Kind is great for development on a single machine.

Kind starts two Docker containers (kind-control-plane and kind-registry) as well as updating your ~/.kube/config file. If you ./run.sh in the Polaris codebase, you will see it start Kubernetes pods like etcdcore-dns and kube-proxy as well as a polaris container.

Metadata file

Iceberg creates a file called iceberg-build.properties when it's built. Having your project do this can be enormously useful when you've ssh-ed into a box and wondering exactly what version is running there because it's a test environment and nobody has keeping tracking of what is deployed where (ie, the real world). 

Iceberg is built with Gradle so uses the com.gorylenko.gradle-git-properties plugin but there appears to be an equivalent for Maven (git-commit-id-plugin).

Quarkus

Polaris has become heavily dependent on Quarkus. The Docker container just runs quarkus-run.jar. This Jar's main class is io.quarkus.bootstrap.runner.QuarkusEntryPoint. This loads /deployments/quarkus/quarkus-application.dat, a binary file that loads all the Polaris guff. Apparently, it's a binary file to minimise start up times.

The Docker image's entrypoint is /opt/jboss/container/java/run/run-java.sh. This script comes FROM the Dockerfile's Quarkus-friendly base image and contains sensible JVM defaults.

Wednesday, May 14, 2025

Interpreting XGBoost models

Playing with XGBoost at the moment. Here are some basic notes:

Scikitlearn's XGBoost Classifier

This takes a lot of parameters but the highlights are:

Also called eta, the learning_rate is similar to neural nets in that a smaller value means to slower but potentially better performance during training.

At what value of minimum loss before splitting a leaf is called gamma. A higher value means a more conservative model.

You can define the loss function with eval_metric. A value of logloss for instance will penalize confidently wrong predictions.

The fraction of the training data used per tree is the subsample argument and the scale_pos_weight weights the positive classes (useful for imbalanced data).

Random Stratified Data

Talking of imbalanced data, if there are two classes, A and B, and the number of data points for A is small, a random sample may randomly have a disproportionate number of As. To address this, we separate the A and B data points and take the exact number from each such that the ratio overall fits the real world.

For instance, if the ratio of A to B is 1:100 and we want a sample of 1000 points, using Random Stratified Data will give us precisely 10 As. Whereas a random sample could (feasibly) give us none.

Notes on the Confusion Matrix

High precision and low recall indicates a model that really wants to be sure it doesn't have too many false positives.

F1 scores are good for checking imbalanced data or false positives and negatives are of the same value. It's domain specific but a value of about 0.6 is a minium for being acceptable.

Specificity is how well a binary classifier spots false negatives. It's between 0 and 1 and higher is better. Sensitivity is the equivalent for false positives.

The Bayes factor is the ratio of the probability of getting this data given a hypothesis is true versus the null hypothesis, that is P(D|H1)/P(D|H0).

When less (data) is more

In survival analysis, you might want to employ a technique called temporaral censoring analysis. This applies to time-to-event data, the subject of survival analysis. The idea is that you censor data to minimise a fundamental problem: a person who yet to be diagnosed with a disease (for example) is classed the same as somebody who will never have it.

A similar approach is used to tackle Simpson's Paradox where the model behaves differently in aggregate than when the data is segmented. Here, we segment the data and evaluate the model on those cohorts only. These techniques are called cohort-based validation or segmented model evaluation depending on how you slice and dice the data.

Calibration

If you were to plot the probabilities of your model (from 0 to 1) against the actual fraction it of class A vs class B at that probability, you'd have a calibration curve. There are a few variants on the methodology (eg, Platt Scaling that treats this as a logisitic regression) but the simpler one is an isotonic method. This sorts your probabilities in ascending order while calculating the cumumlative total of class As monotonically isotonically increasing step function.

A line that hugs the diagonal indicates a well calibrated model.