Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Within a transaction session, the internal state tracks the last fatal/abortable error. When the Producer hits a fatal/abortable exception, it will transit to the error state, and when next time someone uses a transactional API, it will throw the buffered exception. The caveat is that today we wrap many non-fatal exceptions as KafkaException, which does not make a clear boundary on whether the thrown exception is fatal – should fail fast, or just abortable – should catch and abort the ongoing transaction to resume. It affects the stability of upstream users as well such as Streams EOS. This KIP tries to address this gap by classifying exceptions based on the measurement of its fatality, so that end user could just catch the non-fatal ones fairly easy.

Public Interfaces

We will add a new transactional commit API with a returned result to deprecate the old one:

...

languagejava
titleKafkaProducer.java

...

revamping Producer APIs to make it more robust and let exception handling coherent.

Proposed Changes

We are proposing a new transactional API usage template which makes more sense to perform EOS processing safe:

...

As we have seen, there is a callback mechanism in the producer#send which carries the exception type. In EOS setup, it is not required to handle the exception, but for non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. To make the handling easier and consistent, we decide to wrap all fatal exceptions (in the producer perspective) as KafkaException.

Stream side change

EOS Streams will attempt to catch TransactionStateCorruptedException as abortable and resume the work by calling Producer#abortTransaction and cleaning up last transaction state on the stream level. For known exception such as ProducerFenced, the handling shall be the same by wrapping and throwing TaskMigratedException. For exception such as TimeoutException which has special handling logic on Streams, although it will be wrapped as TransactionStateCorruptedException, Streams would be able to get its root cause to take proper action as well.Proposed Changes 

Public Interfaces

As mentioned in the proposed changes section, we would be doing the following public API changes:

  • The commitTransaction() API will no longer throw non-fatal exceptions
  • All the non-fatal exceptions thrown from data transmission APIs will be wrapped as KafkaException, which we will be documented clearly.

Below is a full list of exception types that could be thrown from producer API as of today, and we flagged those that should be thrown as fatal exception by themselves, vs exceptions that should be non-fatal.

...

  • beginTransaction 
  • sendOffsetsToTransaction
  • commitTransaction
  • abortTransaction

Stream side change

...


Compatibility, Deprecation, and Migration Plan

...