You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 37 Next »

Status

Current stateUnder Discussion

Discussion thread: TBD

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Producer supports transactional semantics since 0.11, including the following APIs:

  • InitTransaction for transactional producer identity initialization
  • beginTransaction to start a new transaction 
  • sendOffsetsToTransaction to commit consumer offsets advanced within the current transaction
  • commitTransaction commit the ongoing transaction
  • abortTransaction abort the ongoing transaction

Within a transaction session, the internal state tracks the last fatal/abortable error. When the Producer hits a fatal/abortable exception, it will transit to the error state, and when next time someone uses a transactional API, it will throw the buffered exception. The caveat is that today we wrap many non-fatal exceptions as KafkaException, which does not make a clear boundary on whether the thrown exception is fatal – should fail fast, or just abortable – should catch and abort the ongoing transaction to resume. It affects the stability of upstream users as well such as Streams EOS. This KIP tries to address this gap by revamping Producer APIs to make it more robust and let exception handling coherent and consistent.

For reference, below is a full list of exception types that could be thrown from producer API as of today, and we flagged those that should be thrown as fatal exception by themselves, vs exceptions that should be non-fatal.

  1. ProducerFencedException (Fatal)
  2. InvalidProducerEpochException (Non-fatal)
  3. KafkaException, which potentially wraps the following exceptions:
    1. IllegalStateException (Fatal)
    2. InvalidPidMappingException (Non-fatal)
    3. TransactionAbortedException (Non-fatal)
    4. ClusterAuthorizationException for idempotent send (Fatal)
    5. OutOfOrderSequenceException (Non-fatal)
    6. UnknownProducerIdException for producer state loss (Non-fatal)
    7. TransactionalIdAuthorizationException (Fatal)
    8. UnsupportedVersionException if transaction semantic is not supported (Fatal)
    9. AuthenticationException for transactional request authentication (Fatal)
    10. UnsupportedForMessageFormatException (Fatal)
    11. RuntimeException for detecting more than one inflight request, should be illegal state (Fatal)
    12. InvalidRecordException (Fatal)
    13. InvalidRequiredAcksException (Fatal)
    14. NotEnoughReplicasAfterAppendException (Non-fatal)
    15. NotEnoughReplicasException (Non-fatal)
    16. RecordBatchTooLargeException (Fatal)
    17. InvalidTopicException (Fatal)
    18. CorruptRecordException (Non-fatal)
    19. UnknownTopicOrPartitionException (Non-fatal)
    20. NotLeaderOrFollowerException (Non-fatal)
    21. GroupAuthorizationException (Fatal)
    22. KafkaException
      1. indicates retriable idempotent sequence (Non-fatal)
      2. indicates fatal transactional sequence (Fatal)
      3. indicates Producer closed (Fatal)
      4. InvalidTxnState (Fatal)
      5. unexpected reason (Fatal)
    23. TimeoutException for expired batch (Non-fatal)

Proposed Changes

We are proposing a new transactional API usage template which makes EOS processing safer from handling a mix of fatal and non-fatal exceptions:

Sample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

try {
    while (isRunning) {
        ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
        final boolean shouldCommit;
        try {
            producer.beginTransaction();

            // Do some processing and build the records we want to produce.
            List<ProducerRecord> processed = process(consumed);

            for (ProducerRecord record : processed)
                producer.send(record, (metadata, exception) -> {
                    // not required to capture the exception here.
                });
            producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

            shouldCommit = true;
        } catch (Exception e) {
            // Catch any exception thrown from the data transmission phase.
            shouldCommit = false;
        }

        try {
            if (shouldCommit) {
                producer.commitTransaction();
            } else {
                resetToLastCommittedPositions(consumer);
                producer.abortTransaction();
            }
        } catch (CommitFailedException e) {
            // Transaction commit failed with abortable error, user could reset
            // the application state and resume with a new transaction. The root
            // cause was wrapped in the thrown exception.
            resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
        }
    }
} catch (KafkaException e) { // Recommended closing producer/consumer for fatal exceptions
    producer.close();
    consumer.close();
    throw e;
}

In the above example, we separate the transactional processing into two phases: the data transmission phase, and the commit phase. In data transmission phase, any exception thrown would be an indication of the ongoing transaction failure, so that we got a clear signal for the next stage whether to commit or abort the ongoing transaction.

In the commit phase, we should decide whether to commit or abort transaction based on the previous stage result. In new Producer API, commitTransaction() will no longer throw non-fatal exceptions in their raw formats. Instead, it would try to wrap all non-fatal exceptions as `CommitFailedException`. This means any exception other than `CommitFailedException` caught during the commit phase will be definitely fatal, so user's error handling experience could be simplified by just doing a controlled shutdown.

The only debatable case is timeout exception within commit/abort transaction. It could be treated either fatal or not, as strictly speaking producer would have already done the retrying for max.block.ms, so a timeout here may be suggesting a fatal state to a basic user's perspective. Blindly call abortTxn upon timeout could result in illegal state as well when the previous commit already writes `prepare_commit` on the broker side. Usually caller level could have more sophisticated handling to do an application level retry if necessary, but we don't do any recommendations here. It is highly recommended to increase the request timeout here instead of relying on unreliable retries. 

We also put another try-catch block outside of the whole processing loop to get a chance catching all fatal exceptions and close producer and consumer modules. It is a recommended way to handle fatal exceptions when the application still wants to proceed without any memory leak, but is optional to users.

Unify Wrapped KafkaException

As discussed in the motivation section, in KafkaProducer we have a logic to wrap all thrown exceptions as KafkaException. To make the semantic clear and for advanced users such as Kafka Streams to better understand the root cause, we shall no longer wrap any fatal exceptions, but instead only wrap non-fatal ones as KafkaException. We also detect certain cases where we did a double-wrap of KafkaException internally, which will be addressed to ensure only one layer wrapping is supported.

Callback Exception Improvement

As we have seen, there is a callback mechanism in the producer#send which carries the exception type. In EOS setup, it is not required to handle the exception, but for non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. Although in the callback function comments we listed all the fatal and non-fatal exceptions, in users' perspective they still need to maintain an exhausting list for checking exception types and take proper actions. The application code is fragile when facing ever-changing underlying producer logic with new exception types, and we have seen the difficulty to classify populated exceptions in application level such as Streams.

To make the handling easier and consistent, we suggest to add a new callback API as `onComplete(record, sendFailure)` where sendFailure is a struct containing both the thrown exception and an enum field indicating failure type. Right now there would be 3 types of failures:

  1. message rejected: this error associates with this specific record being produced, such as InvalidTopic or RecordTooLarge
  2. delivery failed: suggests a failure to produce last record, such as NotEnoughReplicas or Timeout
  3. transaction error: an exception relating specifically to transactional semantic, such as ProducerFenced or InvalidTransactionState

With additional flagging, EOS users would be in a much better position interpreting the callback and take proper actions with less effort to diagnose the failures by themselves.

Stream side change

For EOS Kafka Streams case, we would adopt these simplified exception throwing logic by catching all exceptions in the data transmission phase to decide for Streams commit. Furthermore, these changes leave to door open for us to analyze the non-fatal exceptions thrown as well by unwrapping KafkaException's cause and reading failure type through callback.

For known exceptions such as ProducerFenced, the handling shall be simplified as we no longer need to wrap them as TaskMigratedException in the send callback, since they should not crash the stream thread if thrown in raw format, once we adopt the new processing model in the send phase.

For exception such as TimeoutException, we already have KIP-572 to cover the application level retry, which should be ok to let it throw to the upper level.

Public Interfaces

As mentioned in the proposed changes section, we would be doing the following public API changes:

  • The commitTransaction() API will  throw CommitFailedException to wrap non-fatal exceptions
  • All the non-fatal exceptions thrown from data transmission APIs will be wrapped as KafkaException, which we will be documented clearly. This includes:
    • beginTransaction 
    • sendOffsetsToTransaction 
    • send

We would also let commitTransaction API only throw CommitFailedException with wrapped cause when hitting non-fatal exceptions, to simply the exception try-catching.

We are also adding a new callback type of callback function and deprecating the old one:

Callback.java
public interface Callback {

   	@Deprecated
    void onCompletion(RecordMetadata metadata, Exception exception);

	class SendFailure {
        FailureType failureType;
        Exception cause;
    }

    enum FailureType {
        MESSSAGE_REJECTED, // the specific record being produced was rejected, such as InvalidTopic or RecordTooLarge
		DELIVERY_FAILED, // a failure to produce last record, such as NotEnoughReplicas or Timeout
		TRANSACTION_FAILED // a transactional processing failure, such as ProducerFenced or InvalidTransactionState
    }

	/**
     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
     * be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
     * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
     *
     * @param metadata    The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
     *                    with -1 value for all fields except for topicPartition will be returned if an error occurred.
     * @param sendFailure The struct containing the failure type and exception cause.
     */
    default void onCompletion(RecordMetadata metadata, SendFailure sendFailure) {
        this.onCompletion(metadata, sendFailure.cause);
    }
}

To simplify the callback error handling for end users. For example, they could choose to ignore transactional errors since other txn APIs are already taking care of them.

Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs correspondingly, including 

  • beginTransaction 
  • sendOffsetsToTransaction
  • commitTransaction
  • abortTransaction
  • send

Compatibility, Deprecation, and Migration Plan

This is a pure client side change which only affects the resiliency of new Producer client and Streams. For customized EOS use case, user needs to change their exception catching logic to take actions against their exception handling around commitTransaction(), since it no longer throws non-fatal exception, which means it does not indicate a success of commit when not throwing. However, all the thrown exceptions' base type would still be KafkaException, so the effect should be minimal.

Rejected Alternatives

We thought about exhausting all the possible exception types on the Streams level for resiliency, but abandoned the approach pretty soon as it would require a joint code change every time the underlying Producer client throws a new exception. The encapsulation should help reduce the amount of work on the caller side for exception handling. 

We also proposed to add a non-fatal exception wrapper type called TransactionStateCorruptedException to help users distinguish thrown exception types. This solution has compatibility issue and is not necessarily making the developer and user's life easier.

We proposed to to add a return boolean in commitTransaction, so that even if the commit failed internally with non-fatal exception but wasn't throwing, we still got a clear returned signal from commitTransaction to know whether the last commit was successful, as certain EOS users rely on external data storage component and need to perform non-rollback commit operation as necessary. This approach was abandoned as it broke the compatibility since old users would not assume a commitTxn not to be indicating a success when it passes without throwing non-fatal exceptions.

  • No labels