Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

Our proposal has two three parts: 1) safe epoch incrementing, and 2) unknown producer fencing, and 3) simplified client error handling.

Safe Epoch Incrementing: When the producer receives an UNKNOWN_PRODUCER_ID error, in addition to resetting the sequence number, we propose to bump the epoch. For the idempotent producer, bumping the epoch can be done locally since its producer id is unique. The gap at the moment is a safe way for the transactional producer to do so. The basic problem is that the producer may have already been fenced by another instance, so we do not want to allow it to continue.

...

Of course the producer may fail to receive the response from the InitProducerId call, so we need to make this API safe for retries. In the worst case, a retry may span coordinator failover, so we need to record in the transaction log whether the bump was the result of a new producer instance or not. We propose to add a new field to the transaction state message for the "instance_epoch," which is the first epoch last epoch that was assigned to a new producer instance. When the coordinator receives a new InitProducerId request, we will use the following logic to update the epoch:

  1. No epoch is provided: the current epoch will be bumped and the instance last epoch will be set to the new value-1.
  2. Epoch is provided:
    1. Matches Epoch matches current epoch: the current epoch is bumped, but the instance epoch will stay the same.
    2. Epoch is greater than or equal to instance matches last epoch: the current epoch will be returned
    3. Epoch is less than instance epochElse: return INVALID_PRODUCER_EPOCH
    4. Epoch is greater than current epoch: return INVALID_PRODUCER_EPOCH

Unknown Producer Fencing: We propose to introduce a new DescribeTransactionState API which allows a broker to verify with the transaction coordinator whether a producer id has been fenced. This is used only when the broker sees a write with a sequence number 0 from an unknown producer. 

...

Note that it is possible for a transaction to be completed while the DescribeTransactionState response is still inflight. The broker must verify after receiving the response that the producer state is still unknown.

Simplified error handling: Much of the complexity in the error handling of the idempotent/transactional producer is a result of the UNKNOWN_PRODUCER_ID case. Since we are proposing to cache producer state for as long as the transactional id expiration time even after removal from the log, this should become a rare error, so we propose to simplify our handling of it. The current handling attempts to reason about the log start offset and whether or not the batch had been previously retried. If we are sure it is safe, then we attempt to adjust the sequence number of the failed request (and any inflight requests which followed). Not only is this behavior complex to implement, but continuing with subsequent batches introduces the potential for reordering. Currently there is no way to prevent

We propose the following simplifications:

  1. Assignment of the epoch/sequence number to a record batch is permanent and happens at the time of the record send. We will remove the logic to adjust sequence numbers after failing a batch.
  2. When we encounter a fatal error for a batch, we will fail all subsequent batches for that partition. 

This will be simpler to implement and easier for users to reason about. Records will be guaranteed to be delivered in order. If any record fails delivery, then all subsequently sent records fail as well. For the transactional producer, the user can proceed by aborting the current transaction. Internally, the producer will bump the epoch and reset sequence numbers for the next transaction. For the idempotent producer, the user can choose to fail or they can continue (with the possibility of duplication or reordering). If the user continues, the epoch will be bumped locally and the sequence number will be reset.

Public Interfaces

We will bump the InitProducerId API. The new schemas are provided below:

...

Code Block
Value => Version ProducerId ProducerEpoch TxnTimeoutDuration TxnStatus [TxnPartitions] TxnEntryLastUpdateTime TxnStartTime
  Version => 1 (INT16)
  InstanceEpochLastEpoch => INT16  // NEW
  ProducerId => INT16
  ProducerEpoch => INT16
  TxnTimeoutDuration => INT32
  TxnStatus => INT8
  TxnPartitions => [Topic [Partition]]
     Topic => STRING
     Partition => INT32
  TxnLastUpdateTime => INT64
  TxnStartTime => INT64

As described above, the instance last epoch is assigned initialized based on the first call to InitProducerId of a producer instance The first call uses an epoch of -1 and a producer id of -1 and will fence any active instance. The instance epoch does not change until epoch provided in the InitProducerId call. For a new producer instance with the same transactional id is initialized, the value will be -1.

Additionally, this proposal introduces a new API to query transaction state. This will be used to check whether a 

...