Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As we see, the metadata exposed contains member.id, group.instance.id and generation.id, which are essentially the identifiers we use in the normal offset commit protocol. To be able to keep track of the latest metadata information, we will add another callback function to the ConsumerRebalanceListenera top-level API to consumer:

Code Block
languagejava
titleConsumerRebalanceListenerConsumer.java
public interface ConsumerRebalanceListener {
	...
	default void onGroupRejoined(ConsumerGroupMetadata consumerGroupMetadata) {
	 	// no-op
    } 
}ConsumerGroupMetadata getMetadata();

So that EOS users could get refreshed group metadata as needed.

...

And here is a recommended new transactional API usage example:

Code Block
linenumberstruetrue
  Set<String> topics = buildSubscription(); 	
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  KafkaProducer producer = new KafkaProducer(  Set<String> topics = buildSubscription(); 	
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  KafkaProducer producer = new KafkaProducer();
  ConsumerGroupMetadata groupMetadata = null;

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
    void onGroupRejoined(ConsumerGroupMetadata updatedMetadata) {
      consumerGroupMetadata = updatedMetadata;
    }
  });

  producer.initTransactions();	
  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    
	if (groupMetadata == null) {
		throw new IllegalStateException("Consumer poll should be blocked until successfully joined the group and get updated metadata" producer.send(record);
	}
    producer.sendOffsetsToTransaction(consumedOffsets, groupMetadataconsumer.getMetadata());

    producer.commitTransaction();
  }

...

  1. User must be utilizing both consumer and producer as a complete EOS application,
  2. User needs to store transactional offsets inside Kafka group coordinator, not in any other external system for the sake of fencing,
  3. Need to call the new producer.initTransactions(consumer); which passes in a consumer struct for state access during initialization,
  4. Producer needs to call sendOffsetsToTransaction(offsets, groupMetadata) to be able to fence properlyUser needs to configure a referencing copy of the group metadata.

Compatibility, Deprecation, and Migration Plan

...