Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to alter the InitProducerId API to accept an optional current epoch and an optional current producerId. When provided, the transaction coordinator will verify that it the provided epoch matches the current epoch and only allow the version bump if it does.

...

Of course the producer may fail to receive the response from the InitProducerId call, so we need to make this API safe for retries. In the worst case, a retry may span coordinator failover, so we need to record in the transaction log whether the bump was the result of a new producer instance or not. We propose to add a new field to the transaction state message for the last epoch that was assigned to a producer instance. We also need to handle retries of a request that triggered the generation of a new producerId due to epoch exhaustion, so we propose to add a new field to the transaction state message for the previous producerId associated with the transaction. When the coordinator receives a new InitProducerId request, we will use the following logic to update the epoch:

  1. No epoch is provided: the current epoch will be bumped and the last epoch will be set to -1.
  2. Epoch is providedand producerId are provided, and the provided producerId matches the current producerId or the provided producerId matches the previous producerId and the provided epoch is exhausted:
    1. Provided epoch matches current epoch: the last epoch will be set to the current epoch, and the current epoch will be bumped .
    2. Provided epoch matches last epoch: the current epoch will be returned
    3. Else: return INVALID_PRODUCER_EPOCH
  3. Otherwise, return INVALID_PRODUCER_EPOCH

Another case we want to handle is InvalidProducerIdMapping. This error occurs following expiration of the producerId. It's possible that another producerId has been installed in its place following expiration (if another producer instance has become active), or the mapping is empty. We can safely retry the InitProducerId with the logic in this KIP in order to detect which case it is:

...

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ProducerId Epoch
 TransactionalId => NULLABLE_STRING
 TransactionTimeoutMs => INT32
 ProducerId => INT32INT64           // NEW
 Epoch => INT16                // NEW

InitProducerIdResponse => Error ProducerId Epoch
 Error => INT16
 ProducerId => INT64
 Epoch => INT16

...

Code Block
Value => Version ProducerId CurrentEpoch LastBump TxnTimeoutDuration TxnStatus [TxnPartitions] TxnEntryLastUpdateTime TxnStartTime
  Version => 1 (INT16)
  ProducerId => INT16 INT64
  LastProducerId => INT64  // NEW
  CurrentEpoch => INT16
  LastEpoch => INT16  // NEW
  TxnTimeoutDuration => INT32
  TxnStatus => INT8
  TxnPartitions => [Topic [Partition]]
     Topic => STRING
     Partition => INT32
  TxnLastUpdateTime => INT64
  TxnStartTime => INT64

As described above, the last epoch is initialized based on the epoch provided in the InitProducerId call. For a new producer instance, the value will be -1. The last producer id is the previous producer ID associated with the transaction. For a new producer instance, the value will be -1.

...