Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: callback improvement

...

As we have seen, there is a callback mechanism in the producer#send which carries the exception type. In EOS setup, it is not required to handle the exception, but for non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. Although in the callback function comments we listed all the fatal and non-fatal exceptions, in users' perspective they still need to maintain an exhausting list for checking exception types and take proper actions. The application code is fragile when facing ever-changing underlying producer logic with new exception types, and we have seen the difficulty to classify populated exceptions in application level such as Streams.


To make the handling easier and consistent, we decide to wrap all non-fatal exceptions (in the producer perspective) as KafkaException, while maintaining the behavior to pass fatal exception in raw through send callbacksuggest to add a new callback API as `onComplete(record, sendFailure)` where sendFailure is a struct containing both the thrown exception and an enum field indicating failure type. Right now there would be 3 types of failures:

  1. message rejected: this error associates with this specific record being produced, such as InvalidTopic or RecordTooLarge
  2. delivery failed: suggests a failure to produce last record, such as NotEnoughReplicas or Timeout
  3. transaction error: an exception relating specifically to transactional semantic, such as ProducerFenced or InvalidTransactionState

With additional flagging, EOS users would be in a much better position interpreting the callback and take proper actions with less effort to diagnose the failures by themselves.

Stream side change

For EOS Kafka Streams case, we would adopt these simplified exception throwing logic by catching all exceptions in the data transmission phase to decide for Streams commit. Furthermore, these changes leave to door open for us to analyze the non-fatal exceptions thrown as well by unwrapping KafkaException's cause and reading failure type through callback.

For known exceptions such as ProducerFenced, the handling shall be simplified as we no longer need to wrap them as TaskMigratedException in the send callback, since they should not crash the stream thread if thrown in raw format, once we adopt the new processing model in the send phase.

...

We would also let commitTransaction API only throw CommitFailedException with wrapped cause when hitting non-fatal exceptions, to simply the exception try-catching.

We are also adding a new callback type of callback function and deprecating the old one:

Code Block
languagejava
titleCallback.java
public interface Callback {

   	@Deprecated
    void onCompletion(RecordMetadata metadata, Exception exception);

	class SendFailure {
        FailureType failureType;
        Exception cause;
    }

    enum FailureType {
        MESSSAGE_REJECTED, // the specific record being produced was rejected, such as InvalidTopic or RecordTooLarge
		DELIVERY_FAILED, // a failure to produce last record, such as NotEnoughReplicas or Timeout
		TRANSACTION_FAILED // a transactional processing failure, such as ProducerFenced or InvalidTransactionState
    }

	/**
     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
     * be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
     * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
     *
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
     *                 with -1 value for all fields except for topicPartition will be returned if an error occurred.
     * @param sendFailure The struct containing the failure type and exception cause.
     */
    default void onCompletion(RecordMetadata metadata, SendFailure sendFailure) {
        this.onCompletion(metadata, sendFailure.cause);
    }
}

To simplify the callback error handling for end users. For example, they could choose to ignore transactional errors since other txn APIs are already taking care of them.

Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs correspondingly, including 

...