Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This architecture does not scale well as the number of input partitions increases. Every producer come with separate memory buffers, a separate thread, separate network connections. This limits the performance of the producer since we cannot effectively use the output of multiple tasks to improve batching. It also causes unneeded load on brokers since there are more concurrent transactions and more redundant metadata management.

We recommend you to take a look at the detailed design doc to better understand the internal changes we are performing, the KIP only presents high level ideas.

Proposed Changes

We argue that the The root of the problem is that transaction coordinators have no knowledge of consumer group semantics. They simply do not understand that partitions can be moved between processes. Let's take a look at a sample exactly-once use case, which is quoted from KIP-98

Code Block
languagejava
titleKafkaTransactionsExample.java
linenumberstrue
public class KafkaTransactionsExample {
  
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
 
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
    producer.initTransactions();
     
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        producer.beginTransaction();
        	
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord);
        }
    
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        
        producer.endTransaction();
      }
    }
  }
}

Currently transaction Transaction coordinator uses the initTransactions API currently in order to fence producers using the same transactional Id and to ensure that previous transactions have been completed. We propose to switch this guarantee on group coordinator instead. 

...

The main addition of this KIP is a new variant of the current initTransactions API which gives us access to the consumer group states, such as member .id state and generation.id.

Code Block
interface Producer {
  /**
   * This API shall be called for consumer group aware transactional producers.
   */
  void initTransactions(Consumer<byte[], byte[]> consumer); // NEW

  /**
   * No longer need to pass in the consumer group id in a case where we already get access to the consumer state.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) throws ProducerFencedException, IllegalGenerationException; // NEW
}

...

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of generation client.

A new generation.id field shall be added to the `TxnOffsetCommitRequest` request:

...

The reason for doing two rolling bounces is because the old transactional producer doesn’t have access to consumer generation, so group coordinator doesn’t have effective way to fence old zombies. By doing first rolling bounce, the task producer will also opt in accessing the consumer state and send TxnOffsetCommitRequest with generation. With this foundational change, it is much safer to execute step 3.

Rejected Alternatives

...

  • We could use admin client to fetch the inter.broker.protocol on start to choose which type of producer they want to use. This approach however is harder than we expected, because brokers maybe on the different versions and if we need user to handle the tricky behavior during upgrade, it would actually be unfavorable. So a hard-coded config is a better option we have at hand.
  • We have considered to leverage transaction coordinator to remember the assignment info for each transactional producer, however this means we are copying the state data into 2 separate locations and could go out of sync easily. We choose to still use group coordinator to do the generation and partition fencing in this case.
  • We once decided to use a transactional.group.id config to replace the transactional.id, and consolidate all the transaction states for an application under one transactional coordinator. This use case is no longer needed once we rely on group coordinator to do the fencing, and we could re-implement it any time in the future with new upgrade procedure safely.