Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • InitTransaction for transactional producer identity initialization
  • beginTransaction to start a new transaction 
  • sendOffsetsToTransaction to commit consumer offsets advanced within the current transaction
  • commitTransaction commit the ongoing transaction
  • abortTransaction abort the ongoing transaction

Within a transaction session, the internal state tracks the last fatal/abortable error. When the Producer hits a fatal/abortable exception, it will transit to the error state, and when next time someone uses a transactional API, it will throw the buffered exception. The caveat is that today we wrap many non-fatal exceptions as KafkaException, which does not make a clear boundary on whether the thrown exception is fatal – should fail fast, or just abortable – should catch and abort the ongoing transaction to resume. It affects the stability of upstream users as well such as Streams EOS. This KIP tries to address this gap by classifying exceptions based on the measurement of its fatality, so that end user could just catch the non-fatal ones fairly easy.

Public Interfaces

We should add a new exception type called TransactionStateCorruptedException, which wraps non-fatal retriable producer txn exception.

Code Block
languagejava
titleTransactionStateCorruptedException.java
public class TransactionStateCorruptedException extends ApiException {

    public TransactionStateCorruptedException(String message, Throwable cause) {
        super(msg, cause);
    }
}

In a transactional API user's perspective, they should treat TransactionStateCorruptedException as non-fatal and retry. For all other fatal errors, they will be thrown directly instead of wrapping. If the caller calls producer#abortTransaction again in a fatal state, the fatal exception shall still be thrown which is not the recommended approach.

Code Block
languagejava
titleSample.java
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
	try {
    	producer.beginTransaction();
		producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1]));
	    producer.commitTransaction();
 	} catch (TransactionStateCorruptedException e) {
     	// For TransactionStateCorruptedException, just abort the transaction and try again.
     	producer.abortTransaction();
 	} catch (ProducerFencedException | AuthorizationException e) {
  		// User could choose to catch any fatal exceptions as they see necessary, here we just close the producer and exit.
        break;
	} 
}

producer.close();

Proposed Changes

Below is a full list of exception types that could be thrown from producer API as of today, and we flagged those that should be thrown as fatal exception by themselves, vs exceptions that should be non-fatal. All the non-fatal exception will be wrapped as TransactionStateCorruptedException when being thrown.

  1. ProducerFencedException (Fatal)
  2. InvalidProducerEpochException (Non-fatal)
  3. KafkaException, which potentially wraps the following exceptions:
    1. IllegalStateException
    2. InvalidPidMappingException
    3. TransactionAbortedException
    4. ClusterAuthorizationException
    5. UnknownProducerIdException
    6. TransactionalIdAuthorizationException
    7. UnsupportedVersionException
    8. AuthenticationException
    9. IllegalStateException (Fatal)
    10. InvalidPidMappingException (Non-fatal)
    11. TransactionAbortedException (Non-fatal)
    12. ClusterAuthorizationException for idempotent send (Fatal)
    13. UnknownProducerIdException for producer state loss (Non-fatal)
    14. TransactionalIdAuthorizationException (Fatal)
    15. UnsupportedVersionException if transaction semantic is not supported (Fatal)
    16. AuthenticationException for transactional request authentication (Fatal)
    17. UnsupportedForMessageFormatException (Fatal)UnsupportedForMessageFormatException
    18. RuntimeException for detecting more than one inflight request
    19. InvalidRecordException
    20. InvalidRequiredAcksException
    21. NotEnoughReplicasAfterAppendException
    22. NotEnoughReplicasException
    23. RecordBatchTooLargeException
    24. InvalidTopicException
    25. CorruptRecordException
    26. UnknownTopicOrPartitionException
    27. NotLeaderOrFollowerException
    28. , should be illegal state (Fatal)
    29. InvalidRecordException (Fatal)
    30. InvalidRequiredAcksException (Fatal)
    31. NotEnoughReplicasAfterAppendException (Non-fatal)
    32. NotEnoughReplicasException (Non-fatal)
    33. RecordBatchTooLargeException (Fatal)
    34. InvalidTopicException (Fatal)
    35. CorruptRecordException (Non-fatal)
    36. UnknownTopicOrPartitionException (Non-fatal)
    37. NotLeaderOrFollowerException (Non-fatal)
    38. GroupAuthorizationException (Fatal)GroupAuthorizationException
    39. KafkaException
      1. indicates retriable idempotent sequencesequence (Non-fatal)
      2. indicates fatal transactional sequencesequence (Fatal)
      3. indicates Producer closedclosed (Non-fatal)
      4. InvalidTxnState (Fatal)
      5. unexpected reason (Fatal)unknown reason
    40. TimeoutException for expired batch (Non-fatal)

Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs as well, including 

  • beginTransaction 
  • sendOffsetsToTransaction
  • commitTransaction
  • abortTransaction

Stream side change

EOS Streams will attempt to catch TransactionStateCorruptedException as abortable and resume the work by calling Producer#abortTransaction and cleaning up last transaction state on the stream level. For known exception such as ProducerFenced, the handling shall be the same by wrapping and throwing TaskMigratedException. For exception such as TimeoutException which has special handling logic on Streams, although it will be wrapped as TransactionStateCorruptedException, Streams would be able to get its root cause to take proper action as well. 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

This is a pure client side change which only affects the resiliency of new Producer client and Streams, no compatibility issue.

Rejected Alternatives

We thought about exhausting all the possible exception types on the Streams level for resiliency, but abandoned the approach pretty soon as it would require a joint code change every time the underlying Producer client throws a new exception. The encapsulation should help reduce the amount of work on the caller side for exception handling. If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.