Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Two try-catch blocks

...

Within a transaction session, the internal state tracks the last fatal/abortable error. When the Producer hits a fatal/abortable exception, it will transit to the error state, and when next time someone uses a transactional API, it will throw the buffered exception. The caveat is that today we wrap many non-fatal exceptions as KafkaException, which does not make a clear boundary on whether the thrown exception is fatal – should fail fast, or just abortable – should catch and abort the ongoing transaction to resume. It affects the stability of upstream users as well such as Streams EOS. This KIP tries to address this gap by classifying exceptions based on the measurement of its fatality, so that end user could just catch the non-fatal ones fairly easy.

Public Interfaces

We are proposing a new transactional API usage template which makes more sense to perform EOS processing safe:

Code Block
languagejava
titleSample.java
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
	final boolean shouldCommit = 
	try {
    	producer.beginTransaction();
		producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1]));
		return true;
 	} catch (KafkaException e) {
		return false;
 	}

	try {
        if (shouldCommit) {
            TransactionResult result = producer.commitTransaction();
            if (result.failed()) {
				producer.abortTransaction();
			}
        } else {
            producer.abortTransaction();
        }
    } catch (KafkaException e) {
      // log the error
      throw e;
    }
}
}

producer.close();

We should add a new exception type called TransactionStateCorruptedException, which wraps non-fatal retriable producer exception.

...