You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Transactional coordinator uses producer epoch to guarantee a single writer scope for each transactional id. When a new producer with the same transactional id starts up and talks to the coordinator, the coordinator will bump the epoch. Thus when the older producer attempts to make progress, it shall be fenced by a fatal `ProducerFenced` exception as it still uses old epoch.

However, producer epoch could also be bumped when a transaction times out on the coordinator side. This could be caused by a short period of inactivity of the client due to network partition or long GC. When the producer goes back online and attempts to proceed, it will receive the exact `ProducerFenced` even though a conflicting producer doesn't exist. The application has to shut down the current producer and rebuild a new one instead, by placing an extra try-catch logic which is cumbersome.

We can improve this with the new APIs from KIP-360. When the coordinator times out a transaction, it can remember that fact and allow the existing producer to claim the bumped epoch and continue.

Proposed Changes

Retry Workflow

1. When a transaction times out, set lastProducerEpoch to the current epoch and do the normal bump.

2. Any transactional requests from the old epoch result in a new TRANSACTION_TIMED_OUT error code, which is propagated to the application. This mechanism applies to all producer ↔ transaction coordinator APIs:

  • AddPartitionsToTransaction
  • AddOffsetsToTransaction
  • EndTransaction

3. The producer recovers by sending InitProducerId with the current epoch. The coordinator returns the bumped epoch.

One extra issue that needs to be addressed is how to handle `ProducerFenced` from Produce requests. Partition leaders will not generally know if a bumped epoch was the result of a timed out transaction or a fenced producer. In this case, new producers can treat `ProducerFenced` as abortable when they come from Produce responses. Consequently Producer would try to abort the transaction to detect whether this was due to a timeout or otherwise, as end transaction call shall also be protected by the new transaction timeout retry logic.

Resilience Improvement For New and Old Clients

When the broker is on the latest version, the handling logic would be changed as:

  1. If the Producer client is < 2.5, nothing will be changed as it doesn't support the new InitProducerId with epoch API.
  2. If the Producer is >= 2.5 and below latest version, broker shall reply UNKNOWN_PRODUCER_ID to above mentioned API calls to trigger a proper retry of InitTransaction workflow depicted above. This error code is already properly handled to retry on client side from KIP-360.
  3. If the Producer is on latest version, refer to the above "retry workflow".

Public Interfaces

We will add a retriable error code to allow producer distinguish a fatal fencing vs a soft retry after server side timeout:

TRANSACTION_TIMED_OUT(90, "The last ongoing transaction timed out on the coordinator, should retry initialization with current epoch", TransactionTimedOutException::new);

To be able to recognize clients that are capable of handling this new error, we need to bump some transaction related APIs version by 1, to be specific:

  1. AddPartitionsToTransaction to v2 
  2. AddOffsetsToTransaction to v2
  3. EndTransaction to v2

On the public Producer API, we shall also document the new exceptions to let user catch it and retry with initTransactions:

KafkaProducer.java
/**
 * @throws TransactionTimedOutException if the producer has encountered a previously aborted transaction on coordinator side.
 *         Application should retry initializing the producer by calling {@link #initTransactions()} in this case.
 */
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);

/**
 * @throws TransactionTimedOutException if the producer has encountered a previously aborted transaction on coordinator side.
 *         Application should retry initializing the producer by calling {@link #initTransactions()} in this case.
 */
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata);

/**
 * @throws TransactionTimedOutException if the producer has encountered a previously aborted transaction on coordinator side.
 *         Application should retry initializing the producer by calling {@link #initTransactions()} in this case.
 */
public void commitTransaction();

/**
 * @throws TransactionTimedOutException if the producer has encountered a previously aborted transaction on coordinator side.
 *         Application should retry initializing the producer by calling {@link #initTransactions()} in this case.
 */
public void abortTransaction();

Compatibility, Deprecation, and Migration Plan

This change shall only apply to new Producer clients and new brokers, thus having no impact on existing user behavior. To be able to benefit from this feature, user only needs to upgrade clients to 2.5 or newer and brokers to the latest version, without ordering requirement.

Rejected Alternatives

For the INVALID_PRODUCER_EPOCH, we could also consider keeping the current behavior still and let user do the abort transactions manually when catching the exception. This seems to be unnecessary and has more cost to educate users, thus we propose to let Producer handle this scenario internally.


  • No labels