Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSample.java
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
	final boolean shouldCommit = 
	try {
    	producer.beginTransaction();
		producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1]), (metadata, exception) -> {
			// not required to capture the exception here.
		});
		return true;
 	} catch (KafkaException e) {
		return false;
 	}

	try {
        if (shouldCommit) {
            TransactionResult result = producer.commitTransaction();
            if (result.failed()) {
 				// Cleanup state, rewind processing offsets
				producer.abortTransaction();
			}
        } else {
			// Cleanup state, rewind processing offsets
            producer.abortTransaction();
        }
    } catch (KafkaException e) {
      // log the error
      throw e;
    }
}
}

producer.close();

We should add a new exception type called TransactionStateCorruptedException, which wraps non-fatal retriable producer exception.

...

languagejava
titleTransactionStateCorruptedException.java

...


As we have seen, there is a callback mechanism in the producer#send which carries the exception type. In EOS setup, it is not required to handle the exception, but for non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. To make the handling easier and consistent, we decide to wrap all fatal exceptions (in the producer perspective) as KafkaException.

In a transactional API user's perspective, they should treat TransactionStateCorruptedException as non-fatal and retry. For all other fatal errors, they will be thrown directly instead of wrapping. If the caller calls producer#abortTransaction again in a fatal state, the fatal exception shall still be thrown which is not the recommended approach.

...