Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to pass the group generationId to `initTransaction`, we need to expose it from the consumer. We propose to add an overload to onPartitionsAssigned in the consumer's rebalance listener interfacea new function call on consumer to expose the generation info:

Code Block
languagejava
titleConsumer.java
public interface ConsumerRebalanceListener {
  default void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
    onPartitionsAssigned(partitions);
  }
}Generation generation();

With this proposal, the transactional id is no longer needed for proper fencing, but the coordinator still needs a way to identify producers individually as they are executing new transactions. There are two options: continue using the transactional id or use the producer id which is generated by the coordinator in InitProducerId. Either way, the main challenge is authorization. We currently use the transaction Id to authorize transactional operations. In this KIP, we will keep this model unchangedcould instead utilize the consumer group id for authorization. The producer must still provide a transactional Id , but now the only requirement is that it is defined uniquely for each producer in the application. It is no longer tied to exactly once guaranteesif it is working on standalone mode though.

We also need to change the on-disk format for transaction state in order to persist both the consumer group id and the assigned partitions. We propose to use a separate record type in order to store the group assignment. Transaction state records will not change.

...

Code Block
linenumberstrue
  String consumerGroupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig(groupId)); // passing in consumer group id
  producer.initTransactions(new GroupAssignment());

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    try {
		producer.beginTransaction(.sendOffsetsToTransaction(consumedOffsets, consumer.generation().generationId);
    for	} catch (ProducerRecordIllegalGenerationException record : processed)
      producer.send(record);
    producer.sendOffsetsToTransaction(consumedOffsets, groupId);e) {
		throw e; // fail the zombie member if generation doesn't match
	}
	
    producer.commitTransaction();
  }

The main points are the following:

  1. Consumer group id becomes a config value on producer.
  2. Generation.id will be used for group coordinator fencingThe new initTransactions API is used in the ConsumerRebalanceListener passed to subscribe.
  3. We no longer need to close the producer after a rebalance. We can call initTransactions multiple times.

Compatibility, Deprecation, and Migration Plan

...