Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
interface Producer {
  /**
   * ThisShould APIpass shallin bethe called forentire consumer groupstate awarefor transactionalnew producersAPI.
   */
  void initTransactions(Consumer<byte[], byte[]> consumer)sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException, IllegalGenerationException; // NEW

  /**
   * No longer need to pass in the consumer group id in a case where we already get access to the consumer state.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) throws ProducerFencedException, IllegalGenerationException; // NEW
}

Shrink transactional.timeout

}

Shrink transactional.timeout

We shall set `transaction.timout.ms` default to 10000 ms (10 seconds) on Kafka Streams.

...

Code Block
linenumberstrue
  Set<String> topics = buildSubscription(); 	
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  KafkaProducer producer = new KafkaProducer();
 
 ConsumerGroupMetadata groupMetadata // Will access consumer internal state. Only called once in the app's life cycle.
  // Note that this is a blocking call until consumer successfully joins the group.= null;

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
    void onGroupRejoined(ConsumerGroupMetadata updatedMetadata) {
      consumerGroupMetadata = updatedMetadata;
    }
  });

  producer.initTransactions(consumer);	
  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    )
      producer.send(record);
    
	if (groupMetadata == null) {
		throw new IllegalStateException("Consumer poll should be blocked until successfully joined the group and get updated metadata");
	}
    producer.sendOffsetsToTransaction(consumedOffsets, groupMetadata);

    producer.commitTransaction();
  }

...

  1. User must be utilizing both consumer and producer as a complete EOS application,
  2. User needs to store transactional offsets inside Kafka group coordinator, not in any other external system for the sake of fencing,
  3. Need to call the new producer.initTransactions(consumer); which passes in a consumer struct for state access during initialization,
  4. Producer no longer needs to call sendOffsetsToTransaction(offsets, consumerGroupIdgroupMetadata) because we will to be able to access consumer group id internally. Instead just pass offsets as single parameter. fence properly
  5. User needs to configure a referencing copy of the group metadata.

Compatibility, Deprecation, and Migration Plan

...