Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Within a transaction session, the internal state tracks the last fatal/abortable error. When the Producer hits a fatal/abortable exception, it will transit to the error state, and when next time someone uses a transactional API, it will throw the buffered exception. The caveat is that today we wrap many non-fatal exceptions as KafkaException, which does not make a clear boundary on whether the thrown exception is fatal – should fail fast, or just abortable – should catch and abort the ongoing transaction to resume. It affects the stability of upstream users as well such as Streams EOS. This KIP tries to address this gap by revamping Producer APIs to make it more robust and let exception handling coherent and consistent.

For reference, below is a full list of exception types that could be thrown from producer API as of today, and we flagged those that should be thrown as fatal exception by themselves, vs exceptions that should be non-fatal.

  1. ProducerFencedException (Fatal)
  2. InvalidProducerEpochException (Non-fatal)
  3. KafkaException, which potentially wraps the following exceptions:
    1. IllegalStateException (Fatal)
    2. InvalidPidMappingException (Non-fatal)
    3. TransactionAbortedException (Non-fatal)
    4. ClusterAuthorizationException for idempotent send (Fatal)
    5. OutOfOrderSequenceException (Non-fatal)
    6. UnknownProducerIdException for producer state loss (Non-fatal)
    7. TransactionalIdAuthorizationException (Fatal)
    8. UnsupportedVersionException if transaction semantic is not supported (Fatal)
    9. AuthenticationException for transactional request authentication (Fatal)
    10. UnsupportedForMessageFormatException (Fatal)
    11. RuntimeException for detecting more than one inflight request, should be illegal state (Fatal)
    12. InvalidRecordException (Fatal)
    13. InvalidRequiredAcksException (Fatal)
    14. NotEnoughReplicasAfterAppendException (Non-fatal)
    15. NotEnoughReplicasException (Non-fatal)
    16. RecordBatchTooLargeException (Fatal)
    17. InvalidTopicException (Fatal)
    18. CorruptRecordException (Non-fatal)
    19. UnknownTopicOrPartitionException (Non-fatal)
    20. NotLeaderOrFollowerException (Non-fatal)
    21. GroupAuthorizationException (Fatal)
    22. KafkaException
      1. indicates retriable idempotent sequence (Non-fatal)
      2. indicates fatal transactional sequence (Fatal)
      3. indicates Producer closed (Non-fatal)
      4. InvalidTxnState (Fatal)
      5. unexpected reason (Fatal)
    23. TimeoutException for expired batch (Non-fatal)

Proposed Changes

We are proposing a new transactional API usage template which makes more sense to perform EOS processing safesafer from handling a mixed of fatal and non-fatal exceptions:

Code Block
languagejava
titleSample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
    ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	final boolean shouldCommit = 
	try {
    	producer.beginTransaction();

		// Do some processing and build the records we want to produce.
  		List<ProducerRecord> processed = process(consumed);

		for (ProducerRecord record : processed)
    		producer.send(record, (metadata, exception) -> {
				// not required to capture the exception here.
			});		
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		return true;
 	} catch (Exception e) {
		// Catch any exception thrown from the data transmission phase.
		return false;
 	}

	try {
        if (shouldCommit) {
            producer.commitTransaction();
        } else {
			resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
        }
    } catch (TimeoutException e) {
		// the transaction state could be reset when the original commit/abort times out.
		// This is a best-effort demo approach to avoid a producer shutdown, 
		// if abort times out again, the timeout exception will be thrown
		// to the application layer. The total retry time would be 2 * max.block.ms
		resetToLastCommittedPositions(consumer);
		producer.abortTransaction();  
	} catch (KafkaException e) {
	  	producer.close();
		consumer.close();
		throw e;	  				
    }
}

...

In the commit phase, we should decide whether to commit or abort transaction based on the signal from last stateprevious stage result. In herenew Producer API, commitTransaction() will be acting like abortTransaction() where as it no longer throws non-fatal exceptions anymore. This means any exception caught during the commit phase will be definitely fatal which could to simplify user's error handling experience. If the commit failed internally with non-fatal exception but wasn't throwing, the next beginTransaction() will fail non-fatally, causing the `shouldCommit` flag set false, which means the abortTransaction will be triggered automatically in the next iteration.

The only debatable exceptional case is a timeout within commit/abort transaction. It could be treated either fatal or not, as strictly speaking producer would have already done the retrying for max.block.ms, so a timeout here may be suggesting a fatal state. We include the timeout handling in the template here just for example, usually caller level could have more sophisticated handling to do an application level retry if necessary.  

Unify Wrapped

...

KafkaException

As discussed in the motivation section, in KafkaProducer we have a logic to wrap all thrown exceptions as KafkaException. To make the semantic clear and for advanced users such as Kafka Streams to better understand the root cause, we shall no longer wrap any fatal exceptions, but instead only wrap non-fatal ones as KafkaException. We also detect certain cases where we did a double-wrap of KafkaException internally, which will be addressed to ensure only one layer wrapping is supported.

...

As we have seen, there is a callback mechanism in the producer#send which carries the exception type. In EOS setup, it is not required to handle the exception, but for non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. To make the handling easier and consistent, we decide to wrap all fatal exceptions (in the producer perspective) as KafkaException.

Stream side change

For EOS Kafka Streams case, we would adopt these simplified exception throwing logic by catching all exceptions in the data transmission phase to decide for Streams commit. In the send callback, we would not track exceptions when EOS is turned on.

Streams will attempt to catch TransactionStateCorruptedException as abortable and resume the work by calling Producer#abortTransaction and cleaning up last transaction state on the stream level. For known exception such as ProducerFenced, the handling shall be the same by wrapping and throwing TaskMigratedException. For exception such as TimeoutException which has special handling logic on Streams, although it will be wrapped as TransactionStateCorruptedException, Streams would be able to get its root cause to take proper action as well. , we already have KIP-572 to cover the application level retry, which should be ok to let it throw to the upper level.

Public Interfaces

As mentioned in the proposed changes section, we would be doing the following public API changes:

  • The commitTransaction() API will no longer throw non-fatal exceptions
  • All the non-fatal exceptions thrown from data transmission APIs will be wrapped as KafkaException, which we will be documented clearly.

Below is a full list of exception types that could be thrown from producer API as of today, and we flagged those that should be thrown as fatal exception by themselves, vs exceptions that should be non-fatal.

  1. ProducerFencedException (Fatal)
  2. InvalidProducerEpochException (Non-fatal)
  3. KafkaException, which potentially wraps the following exceptions:
    1. IllegalStateException (Fatal)
    2. InvalidPidMappingException (Non-fatal)
    3. TransactionAbortedException (Non-fatal)
    4. ClusterAuthorizationException for idempotent send (Fatal)
    5. OutOfOrderSequenceException (Non-fatal)
    6. UnknownProducerIdException for producer state loss (Non-fatal)
    7. TransactionalIdAuthorizationException (Fatal)
    8. UnsupportedVersionException if transaction semantic is not supported (Fatal)
    9. AuthenticationException for transactional request authentication (Fatal)
    10. UnsupportedForMessageFormatException (Fatal)
    11. RuntimeException for detecting more than one inflight request, should be illegal state (Fatal)
    12. InvalidRecordException (Fatal)
    13. InvalidRequiredAcksException (Fatal)
    14. NotEnoughReplicasAfterAppendException (Non-fatal)
    15. NotEnoughReplicasException (Non-fatal)
    16. RecordBatchTooLargeException (Fatal)
    17. InvalidTopicException (Fatal)
    18. CorruptRecordException (Non-fatal)
    19. UnknownTopicOrPartitionException (Non-fatal)
    20. NotLeaderOrFollowerException (Non-fatal)
    21. GroupAuthorizationException (Fatal)
    22. KafkaException
      1. indicates retriable idempotent sequence (Non-fatal)
      2. indicates fatal transactional sequence (Fatal)
      3. indicates Producer closed (Non-fatal)
      4. InvalidTxnState (Fatal)
      5. unexpected reason (Fatal)
    23. TimeoutException for expired batch (Non-fatal)

All the non-fatal exception will be only thrown by the following RPCs:

  • This includes:
    • beginTransaction 
    • sendOffsetsToTransaction 
    • send

Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs as wellcorrespondingly, including 

  • beginTransaction 
  • sendOffsetsToTransaction
  • commitTransaction
  • abortTransaction
  • send

Compatibility, Deprecation, and Migration Plan

This is a pure client side change which only affects the resiliency of new Producer client and Streams. For customized EOS use case, user needs to change their exception catching logic to take TransactionStateCorruptedException into the consideration, but should be minor code change required without breaking compatibility. take actions against their exception handling around commitTransaction(), since it no longer throws non-fatal exception, which means it does not indicate a success of commit when not throwing. However, all the thrown exceptions' base type would still be KafkaException, so the effect should be minimal.

Rejected Alternatives

We thought about exhausting all the possible exception types on the Streams level for resiliency, but abandoned the approach pretty soon as it would require a joint code change every time the underlying Producer client throws a new exception. The encapsulation should help reduce the amount of work on the caller side for exception handling.