Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleErrors.java
CONCURRENT_PRODUCER_COMMIT(85, "This producer attempted to commit offset to a topic partition which is owned by another producer in this generation.", ConcurrentProducerCommitException::new),

Also we need to make sure we fence `old producer` instead of new ones, so a new generation id field shall be added to the `TxnOffsetCommitRequest` request:

Code Block
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
  TransactionalId     => String
  GroupId             => String
  ProducerId  		  => int64		
  ProducerEpoch       => int16
  Offsets  	          => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32 // NEW

And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back off:

...