I've been looking at Iceberg ticket 11818 "Iceberg Kafka Connector experiences a constant hanging lag for low-volume topics".
Quick background: Kafka Connect is part of the Kafka project is a framework for "getting data from an application into Kafka or getting data out of Kafka into an
external application." [Kafka Streams in Action, Manning]
Kafka Connect
SinkTask is the class that must be implemented and put is the method that receives a Collection of SinkRecords.
This SO answer describing the relationship between connectors and tasks was informative:
"A task is a thread that performs the actual sourcing or sinking of data.
The number of tasks per connector is determined by the implementation of the connector...
For sink connectors, the number of tasks should be equal to the number of partitions of the topic. The task distribution among workers is determined by task rebalance which is a very similar process to Kafka consumer group rebalance."
Iceberg and Kafka Connect
Iceberg's Kafka Connect implementation can be configured to create the table if it does not exist. In the Integration*Test classes, you can see the Kafka Connect is configured such that iceberg.tables.route-field (TABLES_ROUTE_FIELD_PROP) is payload, which is a field in our test's TestEvent objects, messages that are serialized to JSON and sent to Kafka.
The first message will create a RecordWriter. If the IcebergWriterFactory is so configured, it will first create the table to which the RecordWriter will write.
Inspecting Kafka
When running kafka-consumer-groups.sh you can see CURRENT-OFFSET (the offset of the next record the consumer is to read) and LOG-END-OFFSET (the offset of the next record a producer is to write). The LAG is the difference between the two.
Seems that if receivedPartitionCount >= expectedPartitionCount is not true (see CommitState.isCommitReady), Coordinator.receive will not commit and set CommitState.currentCommitId to null. This means a StartCommit is never sent.
Incidentally, I asked on the Iceberg Slack channel what the rationale was behind this but received no replies after 4 days.
This results in the Kafka Connect logging:
[2025-05-27 09:24:22,527] INFO Commit d9f18c4a-5253-4c55-8cce-b3a077bbf3c9 initiated (org.apache.iceberg.connect.channel.Coordinator)
so the commit is initiated but:
[2025-05-27 09:24:23,950] INFO Commit d9f18c4a-5253-4c55-8cce-b3a077bbf3c9 not ready, received responses for 1 of 2 partitions, waiting for more (org.apache.iceberg.connect.channel.CommitState)
Evidently, receivedPartitionCount is 1 and expectedPartitionCount is 2 (see below).
The problem
The line here in CommitState.isCommitReady:
if (receivedPartitionCount >= expectedPartitionCount) {
will lead to isCommitReady returning false if this inequality does not hold. Before we look at the values, let's first see how the leader is chosen.
Follow my leader
The algorithm to elect the leader is this:
"there should only be one task assigned partition 0 of the first topic, so elect that one the leader" [Commiter in Iceberg code].
It does this by taking "the list of partitions that are now assigned to the [Sink] task " [Kafka SinkTask] when it is opened. It then compares this to the members of the consumer group corresponding its ID we have. We call Kafka [docs] directly to get this.
expectedPartitionCount
The "list of the members of the consumer group" [Kafka ConsumerGroupDescription.members] becomes CommitterImpl.membersWhenWorkerIsCoordinator iff the lowest partition ID in for the members happens to be one of the list of partitions we've been given to coordinate via SinkTask.open (see the leadership election logic, above).
That is, if we're the leader, all those partitions are ours.
The Iceberg Coordinator is instantiated with this list and its totalPartitionCount is calculated as the sum of all the partitions for its members.
Great, this is exactly the same as expectedPartitionCount in the code snippet above.
receivedPartitionCount
We take the batch of DataComplete objects the CommitState has been buffering. A DataComplete is a "control event payload for events sent by a worker that indicates it has finished sending all data for a commit request."
We sum the assignments for each Payload.
These assignments are from "the current set of assigned TopicPartitions for this task" [Kafka SinkTaskContext docs], that is, the worker that created the DataComplete objects.
Note that it will "include all assigned topic partitions even if no messages were read" [Iceberg Worker code].
This sum of all these partitions for the messages with our currentCommitId is what we're after.
The problem here is we have a topic with 2 partitions and 2 tasks so each task has a single partition. Therefore, receivedPartitionCount = 1.
The Solution
If the tasks.max for this IcebergSinkConnector is 2, then the partitions will be shared and this inequality does not hold for a single message. But if it's set to 1, the single SinkConnector will deal with all partitions.
This leads to this log line:
[2025-05-27 11:41:21,712] INFO Commit 8dfe1850-d8b6-4054-9d0c-52a782a1d9e4 ready, received responses for all 2 partitions (org.apache.iceberg.connect.channel.CommitState)
Evidently, receivedPartitionCount = 2 and expectedPartitionCount <= 2.