Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaTransactionsExample.java
linenumberstrue
public class KafkaTransactionsExample {
  
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
 
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
    producer.initTransactions();
     
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        producer.beginTransaction();
        	
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord);
        }
    
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        
        producer.endTransaction();
      }
    }
  }
}

Transaction coordinator uses the initTransactions API currently in As one could see, the first thing when a producer starts up is to register its identity through initTransactions API. Transaction coordinator leverages this step in order to fence producers using the same transactional.id and to ensure that previous transactions have been completedmust complete. In the above template, we call consumer.poll() to get data, and internally for the very first time we start doing so, consumer needs to know the input topic offset. This is done by a FetchOffset call to group coordinator. With transactional processing, there could be offsets that are "pending", I.E they are part of some ongoing transactions. Upon receiving FetchOffset request, broker will export offset position to the "latest stable offset" (LSO), which is the largest offset that has already been committed when consumer isolation.level is `read_committed`. Since we rely on unique transactional.id to revoke stale transaction, we believe any pending transaction will be aborted when producer calls initTransaction again. During normal use case such as Kafka Streams, we will also explicitly close producer to send out a EndTransaction request to make sure we start from clean state.

...

We shall set `transaction.timout.ms` default to 10000 ms (10 seconds).

Offset Fetch Request 

...

We need to bump OffsetFetchRequest version to include isolation level:

Code Block
OffsetFetchRequest => Partitions GroupId IsolationLevel
  Partitions          => List<TopicPartition>
  GroupId             => String
  IsolationLevel  	  => int8 // NEW

With setting the isolation.level to `read_committed`, consumer will be required to back-off until all pending offsets are cleared.

Fence Zombie

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of generation client.

...