Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleProducer.java
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId); // DEPRECATED
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets); // NEW


The key to this proposal is allowing a single transaction coordinator to see all members working in the same application group. It can then maintain the invariant that only one producer is allowed to make progress at any time for a particular input partition. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to We could effectively deprecate the `transactional.id` config because we no longer use it for revoking ongoing transactions. Instead we would stick to consumer group id. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId API to support consumer group aware initialization.

...

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ConsumerGroupId AssignedPartitions
  TransactionalId => NullableString
  TransactionTimeoutMs => Int64
  ConsumerGroupId => NullableString         // NEW
  ConsumerGroupGeneration => Int32          // NEW
  AssignedPartitions
InitProducerIdResponse => ThrottleTimeMs [Topic [Partition]] // NEW
    Topic => String
    Partition => Int32

InitProducerIdResponse => ThrottleTimeMs ErrorCode ErrorCode ProducerId ProducerEpoch
  ThrottleTimeMs => Int64
  ErrorCode => Int16
  ProducerId => Int64
  ProducerEpoch => Int16

The new InitProducerId API accepts either a user-configured transactional Id or a consumer group Id and a generation id. When a consumer group is provided, the transaction coordinator will check whether there are any ongoing transactions that include the assigned partitions. If there are, these transactions will be aborted and the corresponding producers will be fenced by bumping their epochs. Once transactions are complete, the call will return.honor consumer group id and allocate a new producer.id every time initTransaction is called. 

Fencing zombie

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we include the consumer group generation. Once the coordinator observes a generation bump for a group, it will refuse to handle requests from the previous generation. The only thing other group members can do is call InitProducerId themselves. This in fact would be the common case since transactions will usually be completed before a consumer joins a rebalance.

...

And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back off:

Code Block
languagejava
titleErrors.java
PENDING_TRANSACTION(86, "There are pending transactions that need to be cleared before proceeding.", PendingTransactionException::new),

Will discuss in more details in Compatibility section.

Example

Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.

let consumer back off:

Code Block
languagejava
titleErrors.java
PENDING_TRANSACTION(86, "There are pending transactions that need to be cleared before proceeding.", PendingTransactionException::new),

Will discuss in more details in Compatibility section.

Example

Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.

Code Block
linenumberstrue
  String consumerGroupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig(groupId)); // passing in consumer group id
  producer.initTransactions(new GroupAssignment()
Code Block
linenumberstrue
  String consumerGroupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig());

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
      void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
        // On assignment, call initTransactions() in order to ensure any
        // transactions involving committed offsets from the assigned partitions
        // have been completed
        producer.initTransactions(new TxnProducerIdentity(partitions, consumerGroupId, generationId));
      }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    producer.sendOffsetsToTransaction(consumedOffsets, groupId);
    producer.commitTransaction();
  }

...