...
We propose to alter the InitProducerId API to accept an optional current epoch and an optional current producerId. When provided, the transaction coordinator will verify that it the provided epoch matches the current epoch and only allow the version bump if it does.
...
Of course the producer may fail to receive the response from the InitProducerId call, so we need to make this API safe for retries. In the worst case, a retry may span coordinator failover, so we need to record in the transaction log whether the bump was the result of a new producer instance or not. We propose to add a new field to the transaction state message for the last epoch that was assigned to a producer instance. We also need to handle retries of a request that triggered the generation of a new producerId due to epoch exhaustion, so we propose to add a new field to the transaction state message for the previous producerId associated with the transaction. When the coordinator receives a new InitProducerId request, we will use the following logic to update the epoch:
- No epoch is provided: the current epoch will be bumped and the last epoch will be set to -1.
- Epoch is providedand producerId are provided, and the provided producerId matches the current producerId or the provided producerId matches the previous producerId and the provided epoch is exhausted:
- Provided epoch matches current epoch: the last epoch will be set to the current epoch, and the current epoch will be bumped .
- Provided epoch matches last epoch: the current epoch will be returned
- Else: return INVALID_PRODUCER_EPOCH
- Otherwise, return INVALID_PRODUCER_EPOCH
Another case we want to handle is InvalidProducerIdMapping. This error occurs following expiration of the producerId. It's possible that another producerId has been installed in its place following expiration (if another producer instance has become active), or the mapping is empty. We can safely retry the InitProducerId with the logic in this KIP in order to detect which case it is:
...
Code Block |
---|
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ProducerId Epoch TransactionalId => NULLABLE_STRING TransactionTimeoutMs => INT32 ProducerId => INT32INT64 // NEW Epoch => INT16 // NEW InitProducerIdResponse => Error ProducerId Epoch Error => INT16 ProducerId => INT64 Epoch => INT16 |
...
Code Block |
---|
Value => Version ProducerId CurrentEpoch LastBump TxnTimeoutDuration TxnStatus [TxnPartitions] TxnEntryLastUpdateTime TxnStartTime Version => 1 (INT16) ProducerId => INT16 INT64 LastProducerId => INT64 // NEW CurrentEpoch => INT16 LastEpoch => INT16 // NEW TxnTimeoutDuration => INT32 TxnStatus => INT8 TxnPartitions => [Topic [Partition]] Topic => STRING Partition => INT32 TxnLastUpdateTime => INT64 TxnStartTime => INT64 |
As described above, the last epoch is initialized based on the epoch provided in the InitProducerId call. For a new producer instance, the value will be -1. The last producer id is the previous producer ID associated with the transaction. For a new producer instance, the value will be -1.
...