Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The main addition of this KIP is a new variant of the current initTransactions API current sendOffsetsToTransaction API which gives us access to the consumer group states, such as member state and generation.id.

Code Block
interface Producer {
  /**
   * Should pass in the entire consumer state for new API.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException, IllegalGenerationException; // NEW
}

...

We shall set `transaction.timout.ms` default to 10000 ms (10 seconds) on Kafka Streams. For non-stream users, we highly recommend you to do the same if you want to use the new semantics.

Offset Fetch Request 

We will add a new error code for consumer to wait for pending transaction clearance. In order to be able to return corresponding exceptions for old/new clients, we shall also bump the OffsetFetch protocol version.

...

In the meantime, this offset fetch back-off should be only applied to EOS use cases, not general offset fetch use case such as admin client access. we We shall also define a flag within offset fetch request so that we only trigger back-off logic when the request is involved in EOS, I.E on isolation level read_committed.

...

To help get access to consumer state for txn producer, consumer will expose a new API for some of its internal states as an opaque struct. This is already done by KIP-429, and we just take showcase the some high level information structure here for convenience.

Code Block
languagejava
ConsumerGroupMetadata {
  public String groupId();
 
  public int generationId();
 
  public String memberId();
 
  public Optional<String> groupInstanceId();
}

...

And here is a recommended new transactional API non-EOS usage example:

Code Block
linenumberstrue
  Set<String> topics = buildSubscription(); 	
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  KafkaProducer producer = new KafkaProducer();

  producer.initTransactions();	
  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.getMetadata());

    producer.commitTransaction();
  }

...

Following the above principle, Kafka Streams uses version probing to solve the perform online upgrade problem. Step by step guides are:

  1. Broker must be upgraded to 2.4 first. This means the `inter.broker.protocol.version` (IBP) has to be set to the latest. Any produce request with higher version will automatically get fenced because of no support.
  2. Upgrade the stream application binary and choose to set UPGRADE_FROM_CONFIG config to 2.3 or lower. Do the first rolling bounce, and make sure the group is stable with every instance on 2.4 binary.
  3. Just removeRemove/unset that config, to make application point UPGRADE_FROM_CONFIG, in order to point the application to actual Kafka client version 2.4. Do a second rolling bounce and now the application officially starts using new thread producer for EOS.

...