Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The new InitProducerId API accepts either a user-configured transactional Id or a consumer group Id and the set of assigned partitions. When a consumer group is provided, the transaction coordinator will check whether there are any ongoing transactions that include the assigned partitions. If there are, these transactions will be aborted and the corresponding producers will be fenced by bumping their epochs. Once transactions are complete, the call will return.

Fencing

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we include the consumer group generation. Once the coordinator observes a generation bump for a group, it will refuse to handle requests from the previous generation. The only thing other group members can do is call InitProducerId themselves. This in fact would be the common case since transactions will usually be completed before a consumer joins a rebalance.

...

When set to true and exactly-once is turned on, Kafka Streams application will choose to use single producer per thread.

Compatibility fencing

To fence an old producer accessing the same topic partition, we will introduce a new exception type:

...

Code Block
languagejava
titleErrors.java
CONSUMEPENDING_BACK_OFFTRANSACTION(85, "ThisCould producernot attemptedconsume to commit offset to afrom this topic partition whichdue isto ownedpending bytransactions another producer in this generationgoing on.", ConcurrentProducerCommitExceptionPendingTransactionException::new),

Will discuss in more details in Compatibility section.

...