Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

On the other thread, there are proposals around making producer API adopt more modern return types than a mere Future. This is a potential good opportunity to merge these two efforts. 

...

With additional flagging, producer users would be in a much better position interpreting the callback and take proper actions with less effort to diagnose the failures by themselves. 

We are also introducing a new producer API to return CompletionStage<RecordMetadata> as KIP-706 suggested, and this new This new exception type would be thrown back to the user only in the new API completion stage to maintain compatibilityproducer API depicted in KIP-706.

Stream side change

For EOS Kafka Streams case, we would adopt these simplified exception throwing logic by catching all exceptions in the data transmission phase to decide for Streams commit. Furthermore, these changes leave to door open for us to analyze the non-fatal exceptions thrown as well by unwrapping KafkaException's cause and reading failure type through callback.

...

We would also let commitTransaction API only throw CommitFailedException with wrapped cause when hitting non-fatal exceptions, to simply the exception try-catching.


We would add a new Producer error type called ProduceFailedException which tries to wrap send/produce exceptions with root cause and reasoning.

Code Block
languagejava
titleCallback.java
/**
 * Exception indicating a produce failure for the given record, with root cause and reasoning embedded.
 */
public ProduceFailedException extends ApiException {

    private final FailureType type;
    
    public ProduceFailedException(Throwable cause, FailureType type) {
        super(cause);
        this.type = type;
    }
    
    public FailureType failureType() {
        return type;
    }

    enum FailureType {
        MESSSAGE_REJECTED, // the specific record being produced was rejected, such as InvalidTopic or RecordTooLarge
		DELIVERY_FAILED, // a failure to produce last record, such as NotEnoughReplicas or Timeout
		TRANSACTION_FAILED // a transactional processing failure, such as ProducerFenced or InvalidTransactionState
    }
}

We are also adding a new producer#produce function and deprecating the old send APIs (borrowed from KIP-706 template):

Code Block
languagejava
titleCallback.java
public interface Callback {

    /**
     * Asynchronously send a record to a topic and complete the stage when the send has been acknowledged.
     * 
     * @param record The record to produce
     * @return A completion stage to listen for the record send result. The future stage could throw 
     *         {@link ProduceFailedException} If the send failed, with embedded cause exception and failure type.
     */
   	CompletionStage<RecordMetadata> produce(ProducerRecord<K, V> record);
 
	@Deprecated
	Future<RecordMetadata> send(ProducerRecord<K, V> record);
 
	@Deprecated
	Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

To simplify the error handling for end users, as no more optional callback is required. The failure type in the embed exception should be helpful, for example, they could choose to ignore transactional errors since other txn APIs are already taking care of themtaking care of them. We expect this error code to be implemented once KIP-706 is accepted, which would provide a more user-friendly send API with CompletableFuture or similar.

Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs correspondingly, including 

...

We proposed to to add a return boolean in commitTransaction, so that even if the commit failed internally with non-fatal exception but wasn't throwing, we still got a clear returned signal from commitTransaction to know whether the last commit was successful, as certain EOS users rely on external data storage component and need to perform non-rollback commit operation as necessary. This approach was abandoned as it broke the compatibility since old users would not assume a commitTxn not to be indicating a success when it passes without throwing non-fatal exceptions.

We discussed about throwing ProduceFailedException within the send callback, however there are compatibility concerns. The other approach is to make failure reason as part of the callback function, which is less optimized than introducing a true async method like KIP-706 did.