You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 31 Next »

Status

Current stateUnder Discussion

Discussion thread: TBD

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Producer supports transactional semantics since 0.11, including the following APIs:

  • InitTransaction for transactional producer identity initialization
  • beginTransaction to start a new transaction 
  • sendOffsetsToTransaction to commit consumer offsets advanced within the current transaction
  • commitTransaction commit the ongoing transaction
  • abortTransaction abort the ongoing transaction

Within a transaction session, the internal state tracks the last fatal/abortable error. When the Producer hits a fatal/abortable exception, it will transit to the error state, and when next time someone uses a transactional API, it will throw the buffered exception. The caveat is that today we wrap many non-fatal exceptions as KafkaException, which does not make a clear boundary on whether the thrown exception is fatal – should fail fast, or just abortable – should catch and abort the ongoing transaction to resume. It affects the stability of upstream users as well such as Streams EOS. This KIP tries to address this gap by revamping Producer APIs to make it more robust and let exception handling coherent and consistent.

For reference, below is a full list of exception types that could be thrown from producer API as of today, and we flagged those that should be thrown as fatal exception by themselves, vs exceptions that should be non-fatal.

  1. ProducerFencedException (Fatal)
  2. InvalidProducerEpochException (Non-fatal)
  3. KafkaException, which potentially wraps the following exceptions:
    1. IllegalStateException (Fatal)
    2. InvalidPidMappingException (Non-fatal)
    3. TransactionAbortedException (Non-fatal)
    4. ClusterAuthorizationException for idempotent send (Fatal)
    5. OutOfOrderSequenceException (Non-fatal)
    6. UnknownProducerIdException for producer state loss (Non-fatal)
    7. TransactionalIdAuthorizationException (Fatal)
    8. UnsupportedVersionException if transaction semantic is not supported (Fatal)
    9. AuthenticationException for transactional request authentication (Fatal)
    10. UnsupportedForMessageFormatException (Fatal)
    11. RuntimeException for detecting more than one inflight request, should be illegal state (Fatal)
    12. InvalidRecordException (Fatal)
    13. InvalidRequiredAcksException (Fatal)
    14. NotEnoughReplicasAfterAppendException (Non-fatal)
    15. NotEnoughReplicasException (Non-fatal)
    16. RecordBatchTooLargeException (Fatal)
    17. InvalidTopicException (Fatal)
    18. CorruptRecordException (Non-fatal)
    19. UnknownTopicOrPartitionException (Non-fatal)
    20. NotLeaderOrFollowerException (Non-fatal)
    21. GroupAuthorizationException (Fatal)
    22. KafkaException
      1. indicates retriable idempotent sequence (Non-fatal)
      2. indicates fatal transactional sequence (Fatal)
      3. indicates Producer closed (Fatal)
      4. InvalidTxnState (Fatal)
      5. unexpected reason (Fatal)
    23. TimeoutException for expired batch (Non-fatal)

Proposed Changes

We are proposing a new transactional API usage template which makes EOS processing safer from handling a mix of fatal and non-fatal exceptions:

Sample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
    ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	final boolean shouldCommit;
	try {
    	producer.beginTransaction();

		// Do some processing and build the records we want to produce.
  		List<ProducerRecord> processed = process(consumed);

		for (ProducerRecord record : processed)
    		producer.send(record, (metadata, exception) -> {
				// not required to capture the exception here.
			});		
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		shouldCommit = true;
 	} catch (Exception e) {
		// Catch any exception thrown from the data transmission phase.
		shouldCommit = false;
 	}

	try {
        if (shouldCommit) {
            producer.commitTransaction(); // new API with a returned flag.
        } else {
            resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
        }
    } catch (CommitFailedException e) {
        // Transaction commit failed with abortable error, user could reset
        // the application state and resume with a new transaction. The root
        // cause was wrapped in the thrown exception. 
        resetToLastCommittedPositions(consumer);
        producer.abortTransaction(); 
    } catch (KafkaException e) {
        producer.close();
        consumer.close();
        throw e;                   
    }
}

In the above example, we separate the transactional processing into two phases: the data transmission phase, and the commit phase. In data transmission phase, any exception thrown would be an indication of the ongoing transaction failure, so that we got a clear signal for the next stage whether to commit or abort the ongoing transaction.

In the commit phase, we should decide whether to commit or abort transaction based on the previous stage result. In new Producer API, commitTransaction() will no longer throw non-fatal exceptions in their raw formats. Instead, it would try to wrap non-fatal exceptions as `CommitFailedException` so that on the caller side it would be much easier to catch and handle. This means any exception other than `CommitFailedException` caught during the commit phase will be definitely fatal, so user's error handling experience could be simplified by just doing a controlled shutdown.

The only debatable case is timeout exception within commit/abort transaction. It could be treated either fatal or not, as strictly speaking producer would have already done the retrying for max.block.ms, so a timeout here may be suggesting a fatal state to a basic user's perspective. Blindly call abortTxn upon timeout could result in illegal state as well when the previous commit already writes `prepare_commit` on the broker side. Usually caller level could have more sophisticated handling to do an application level retry if necessary, but we don't do any recommendations here. It is highly recommended to increase the request timeout here instead of relying on unreliable retries. 

Unify Wrapped KafkaException

As discussed in the motivation section, in KafkaProducer we have a logic to wrap all thrown exceptions as KafkaException. To make the semantic clear and for advanced users such as Kafka Streams to better understand the root cause, we shall no longer wrap any fatal exceptions, but instead only wrap non-fatal ones as KafkaException. We also detect certain cases where we did a double-wrap of KafkaException internally, which will be addressed to ensure only one layer wrapping is supported.

Callback Exception Improvement

As we have seen, there is a callback mechanism in the producer#send which carries the exception type. In EOS setup, it is not required to handle the exception, but for non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. To make the handling easier and consistent, we decide to wrap all non-fatal exceptions (in the producer perspective) as KafkaException, while maintaining the behavior to pass fatal exception in raw through send callback.

Stream side change

For EOS Kafka Streams case, we would adopt these simplified exception throwing logic by catching all exceptions in the data transmission phase to decide for Streams commit. Furthermore, these changes leave to door open for us to analyze the non-fatal exceptions thrown as well by unwrapping KafkaException's cause.

For known exceptions such as ProducerFenced, the handling shall be simplified as we no longer need to wrap them as TaskMigratedException in the send callback, since they should not crash the stream thread if thrown in raw format, once we adopt the new processing model in the send phase.

For exception such as TimeoutException, we already have KIP-572 to cover the application level retry, which should be ok to let it throw to the upper level.

Public Interfaces

As mentioned in the proposed changes section, we would be doing the following public API changes:

  • The commitTransaction() API will no longer throw non-fatal exceptions
  • All the non-fatal exceptions thrown from data transmission APIs will be wrapped as KafkaException, which we will be documented clearly. This includes:
    • beginTransaction 
    • sendOffsetsToTransaction 
    • send

We would also let commitTransaction API only throw CommitFailedException with wrapped cause when hitting non-fatal exceptions, to simply the exception try-catching.

Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs correspondingly, including 

  • beginTransaction 
  • sendOffsetsToTransaction
  • commitTransaction
  • abortTransaction
  • send

Compatibility, Deprecation, and Migration Plan

This is a pure client side change which only affects the resiliency of new Producer client and Streams. For customized EOS use case, user needs to change their exception catching logic to take actions against their exception handling around commitTransaction(), since it no longer throws non-fatal exception, which means it does not indicate a success of commit when not throwing. However, all the thrown exceptions' base type would still be KafkaException, so the effect should be minimal.

Rejected Alternatives

We thought about exhausting all the possible exception types on the Streams level for resiliency, but abandoned the approach pretty soon as it would require a joint code change every time the underlying Producer client throws a new exception. The encapsulation should help reduce the amount of work on the caller side for exception handling. 

We also proposed to add a non-fatal exception wrapper type called TransactionStateCorruptedException to help users distinguish thrown exception types. This solution has compatibility issue and is not necessarily making the developer and user's life easier.

We proposed to to add a return boolean in commitTransaction, so that even if the commit failed internally with non-fatal exception but wasn't throwing, we still got a clear returned signal from commitTransaction to know whether the last commit was successful, as certain EOS users rely on external data storage component and need to perform non-rollback commit operation as necessary. This approach was abandoned as it broke the compatibility since old users would not assume a commitTxn not to be indicating a success when it passes without throwing non-fatal exceptions.

  • No labels