You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread:

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Idempotent/transactional semantics depend on the broker retaining state for each active producer id (e.g. epoch and sequence number). When the broker loses that state–due to segment deletion or a call to DeleteRecords–then additional produce requests will result in the UNKNOWN_PRODUCER_ID error.

Currently the producer attempts to handle this error by comparing the last acknowledged offset with the log start offset from the produce response with the error. If the last acknowledged offset is smaller than the log start offset, then the producer assumes that the error is spurious. It resets the sequence number to 0 and retries using the existing epoch.

There are several problems with this approach: 

  1. Resetting the sequence number is safe only if we are guaranteed that the produce request was not written to the log. If we had to retry the request–e.g. due to a disconnect–then we have no way to know whether the first attempt was successful or not. The only option at the moment is to treat this as a fatal error for the producer.
  2. Currently there are not any strict guarantees that the log start offset will increase monotonically. In particular, a new leader can be elected without having seen the latest log start offset from the previous leader. The impact for the producer is that state that was once lost may resurface after a leader failover. This can lead to an out of sequence error when the producer has already reset its sequence number. This is a fatal error for the producer.
  3. The broker accepts produce requests without prior state if the sequence number is 0. There is no guarantee that the producer hasn't been previously fenced. In the worst case, this can lead to a dangling transaction. For example, suppose a zombie producer has its transaction aborted by the coordinator and then a call to DeleteRecords causes this state to be lost. The fenced zombie may still be able to write with sequence 0, but it cannot complete the transaction since it is fenced and the transaction coordinator does not know about the write.

Resetting the sequence number is fundamentally unsafe because it violates the uniqueness of produced records. Additionally, the lack of validation on the first write of a producer introduces the possibility of non-monotonic updates and hence, dangling transactions. In this KIP, we propose to address these problems and simplify the client handling.  

Proposed Changes

Our proposal has two parts: 1) safe epoch incrementing, and 2) unknown producer fencing.

Safe Epoch Incrementing: When the producer receives an UNKNOWN_PRODUCER_ID error, in addition to resetting the sequence number, we propose to bump the epoch. For the idempotent producer, bumping the epoch can be done locally since its producer id is unique. The gap at the moment is a safe way for the transactional producer to do so. The basic problem is that the producer may have already been fenced by another instance, so we do not want to allow it to continue.

We propose to alter the InitProducerId API to accept an optional current epoch. When provided, the transaction coordinator will verify that it matches the current epoch and only allow the version bump if it does.

To simplify the handling, the producer will take the following steps upon receiving the UNKNOWN_PRODUCER_ID error:

  1. Enter the ABORTABLE_ERROR state. The only way to recover is for the user to abort the current transaction.
  2. Use the InitProducerId API to request an epoch bump using the current epoch.
  3. If another producer has already bumped the epoch, this will result in a fatal PRODUCER_FENCED error.
  4. If the epoch bump succeeds, the producer will reset sequence numbers back to 0 and continue after the next transaction begins.

Of course the producer may fail to receive the response from the InitProducerId call, so we need additionally to make this API safe for retries. We propose to introduce an instance id which is uniquely assigned by the transaction coordinator on the first call to InitProducerId. This instance id is fixed for the lifetime of the producer instance and provided in additional calls to InitProducerId. If the transaction coordinator receives an InitProducerId which has a matching instance id, but an older epoch, it will simply return the latest epoch. 

Unknown Producer Fencing: We propose to introduce a new inter-broker API which allows a broker to verify with the transaction coordinator whether a producer id has been fenced. This is used only when the broker sees a write with a sequence number 0 from an unknown producer. 

In practice, we expect the need for this API to be rare. As suggested in KAFKA-7190, we will alter the behavior of the broker to retain the cached producer state even after it has been removed from the log. Instead it will be removed only when the transactional id expiration time has passed. Under some circumstances we may have to rebuild the producer state using the log. One example is partition reassignment. A new replica will only see the producers which have state in the log. If one of these replicas becomes a leader, we may see the UNKNOWN_PRODUCER_ID error, which will result in an epoch bump. But the monotonicity of producer writes will never be violated.

Public Interfaces

We will bump the InitProducerId API. The new schemas are provided below:

InitProducerIdRequest => TransactionalId TransactionTimeoutMs InstanceId Epoch
 TransactionalId => NULLABLE_STRING
 TransactionTimeoutMs => INT32
 InstanceId => NULLABLE_STRING // NEW
 Epoch => INT16                // NEW


InitProducerIdResponse => Error InstanceId ProducerId Epoch
 Error => INT16
 InstanceId => STRING  // NEW
 ProducerId => INT64
 Epoch => INT16

As described above, the instance id is uniquely assigned on the first call to InitProducerId, which uses an initial value of NULL and an epoch of -1. The instance id does not change until a new producer instance with the same transactional id is initialized.

The instance id must be added to the persistent state in the transaction log so that it is not lost upon coordinator failover. We will bump the version of the transaction state message to 1. The new schema is provided below:

Value => Version ProducerId ProducerEpoch TxnTimeoutDuration TxnStatus [TxnPartitions] TxnEntryLastUpdateTime TxnStartTime
  Version => 1 (INT16)
  InstanceId => STRING  // NEW
  ProducerId => INT16
  ProducerEpoch => INT16
  TxnTimeoutDuration => INT32
  TxnStatus => INT8
  TxnPartitions => [Topic [Partition]]
     Topic => STRING
     Partition => INT32
  TxnLastUpdateTime => INT64
  TxnStartTime => INT64


Additionally, this proposal introduces a new inter-broker API which is used to verify the epoch associated with a transactional id.

CheckProducerEpochRequest => TransactionalId ProducerId Epoch
  TransactionalId => STRING
  ProducerId => INT64
  Epoch => INT16


CheckProducerEpochResponse => Error ProducerId Epoch
  Error => INT16
  ProducerId => INT64
  Epoch => INT16

The response includes the latest producer id and the latest epoch. If the provided epoch and producer id are correct, the coordinator will return NONE as the error code. Otherwise, the following errors are possible:

  1. COORDINATOR_LOADING
  2. NOT_COORDINATOR
  3. COORDINATOR_NOT_AVAILABLE
  4. INVALID_PRODUCER_ID_MAPPING
  5. INVALID_PRODUCER_EPOCH

Only in the latter two cases will the latest producer id and epoch be provided in the response.

Compatibility, Deprecation, and Migration Plan

The main problem from a compatibility perspective is dealing with the existing producers which reset the sequence number to 0 but continue to use the same epoch. We believe that caching the producer state even after it is no longer retained in the log will make the UNKNOWN_PRODUCER_ID error unlikely in practice. Furthermore, even if the sequence number is reset, the fencing check should still be valid. So we expect the behavior to continue to work as expected even with the additional protection. 

The new CheckProducerEpoch API and the new version of the transaction state message will not be used until the inter-broker version supports it. We expect the usual two rolling bounce process for updating the cluster.

Rejected Alternatives

We considered fixing this problem in streams by being less aggressive with record deletion for repartition topics. This might make the problem less likely, but it does not fix it and we would like to have a general solution for all EOS users.


  • No labels