Sunday, September 4, 2022

Kafka/FS2 trouble shooting

I had FS2 and Kafka working quite nicely together in example code ... until I started using explicit transactions. Then my code was pausing and finally failing with:

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting EndTxn(false)

Cranking up the logging on the client side to TRACE revealed a tight loop that output something like this (simplified):

2022-09-02 16:50:41,898 | TRACE | o.a.k.c.p.i.TransactionManager - [Producer clientId=CLIENT_ID, transactionalId=TX_ID] Request TxnOffsetCommitRequestData(...) dequeued for sending
2022-09-02 16:50:41,998 | DEBUG | o.a.k.c.producer.internals.Sender - [Producer clientId=CLIENT_ID, transactionalId=TX_ID] Sending transactional request TxnOffsetCommitRequestData(...) to node 127.0.0.1:9092 (id: 1001 rack: null) with correlation ID 527
2022-09-02 16:50:41,998 | DEBUG | o.apache.kafka.clients.NetworkClient - [Producer clientId=CLIENT_ID, transactionalId=TX_ID] Sending TXN_OFFSET_COMMIT request with header RequestHeader(apiKey=TXN_OFFSET_COMMIT, apiVersion=3, clientId=CLIENT_ID, correlationId=527) and timeout 30000 to node 1001: TxnOffsetCommitRequestData(...)
2022-09-02 16:50:41,999 | DEBUG | o.apache.kafka.clients.NetworkClient - [Producer clientId=CLIENT_ID, transactionalId=TX_ID] Received TXN_OFFSET_COMMIT response from node 1001 for request with header RequestHeader(...)
2022-09-02 16:50:41,999 | TRACE | o.a.k.c.p.i.TransactionManager - [Producer clientId=CLIENT_ID, transactionalId=TX_ID] Received transactional response TxnOffsetCommitResponseData(...) for request TxnOffsetCommitRequestData(...)
2022-09-02 16:50:41,999 | DEBUG | o.a.k.c.p.i.TransactionManager - [Producer clientId=CLIENT_ID, transactionalId=TX_ID] Received TxnOffsetCommit response for consumer group MY_GROUP: {test_topic-1=UNKNOWN_TOPIC_OR_PARTITION}


The topic at least certainly did exist:

$ kafka-topics.sh  --bootstrap-server localhost:9092  --topic test_topic --describe
Topic: test_topic PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test_topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001

The 'solution' was to use more partitions than replicas. "I made the replication factor less than the number of partitions and it worked for me. It sounds odd to me but yes, it started working after it." [SO]

But why? Putting breakpoints in the Kafka code for all references to UNKNOWN_TOPIC_OR_PARTITION and running my client code again lead me to KafkaApis.handleTxnOffsetCommitRequest (which seems reasonable since the FS2-Kafka client is trying to handle the offsets manually). There I could see that my partition in org.apache.kafka.common.TopicPartition was 1 when the Kafka server was expecting 0. Oops. I had guessed this number to make the client compile and forgot to go back to fix it. 

So, the topic did exist but the partition did not. Creating more partition just means that there is something there to commit, sort of addressing the symptom not the cause. 

The takeaway point is that committing the transactions by hand requires knowledge of the structure of the topic.

No comments:

Post a Comment