Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Based on these observations, a separate error code will be created as PRODUCE_FENCED, such that INVALID_PRODUCER_EPOCH is no longer fatal and could trigger KIP-360 logic. New producers will treat INVALID_PRODUCER_EPOCH as abortable when they come from Produce responses. In the next step, Producer would try to abort the transaction to detect whether this was due to a timeout or otherwise, as end transaction call shall also be protected by the new transaction timeout retry logic.

...

In essence, INVALID_PRODUCER_EPOCH should only get thrown from Produce requests and is retriable, while all transaction coordinator interacting protocols use PRODUCER_FENCED.

Old Brokers Error Propagation

When the client is on the latest version but the broker is old, the client shall still see INVALID_PRODUCER_EPOCH from produce response and transactional responses. In either case, we shall abort the current transaction and retry with transaction coordinator. If the This error should still be treated as fatal exception, and we don't need to check for broker version explicitly since new transaction coordinator doesn't support InitProducerId with epoch, we should enter fatal state just as KIP-360reply with this error code anymore to new client. The new client would translate this as PRODUCER_FENCED when transiting to fatal error state.

Interaction With Older Clients

...

  1. AddPartitionsToTransaction to v2 
  2. AddOffsetsToTransaction to v2
  3. EndTransaction to v2
  4. InitProducerId to v4

On the public Producer API, we shall also document the new exceptions to let user catch it and retry with initTransactions, do the transaction abortion and restart a new transaction for both TransactionTimedOutException and InvalidProducerEpochException. The only exception is the "abortTransaction", which was supposed to be safe when being called inside the catch block for abortable exceptions.

Code Block
languagejava
titleKafkaProducer.java
/**
 * @throws TransactionTimedOutException if the producer has encountered a previously aborted transaction on coordinator side.
 *         Application should catch it and retry starting another transaction in this case.
 * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
 *         to the partition leader. See the exception for more details
 */
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);

/**
 * @throws TransactionTimedOutException if the producer has encountered a previously aborted transaction on coordinator side.
 *         Application should catch it and retry starting another transaction in this case.
 * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
 *         to the partition leader. See the exception for more details
 */
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata);

/**
 * @throws TransactionTimedOutException if the producer has encountered a previously aborted transaction on coordinator side.
 *         Application should catch it and retry starting another transaction in this case.
 * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch to the partition leader.
 *         Applicationto shouldthe catchpartition itleader. andSee retrythe startingexception anotherfor transaction in this case.more details
 */
public void commitTransaction();

/**
 * (In callback:)
 * @throws InvalidProducerEpochException if the producer has attempted to produce with an old epoch to the partition leader.
 *         Application should catch it and retry starting another transaction in this case.
 */
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

...