Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

try {
    while (isRunning) {
        ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	        final boolean shouldCommit;
	        try {
           	 producer.beginTransaction();

		            // Do some processing and build the records we want to produce.
            		List<ProducerRecord> processed = process(consumed);

		            for (ProducerRecord record : processed)
                		producer.send(record, (metadata, exception) -> {
				                    // not required to capture the exception here.
			                });		
		
            producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		            shouldCommit = true;
 	        } catch (Exception e) {
		            // Catch any exception thrown from the data transmission phase.
		            shouldCommit = false;
 	        }

	        try {
            if (shouldCommit) {
                producer.commitTransaction();
            } else {
                resetToLastCommittedPositions(consumer);
                producer.abortTransaction();
            }
        } catch (CommitFailedException e) {
            // Transaction commit failed with abortable error, user could reset
            // the application state and resume with a new transaction. The root
            // cause was wrapped in the thrown exception.
   
         resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
        }
    }
} catch (KafkaException e) {
     // Recommended closing producer/consumer for fatal exceptions
    producer.close();
        consumer.close();
        throw e;                   
    }
}

In the above example, we separate the transactional processing into two phases: the data transmission phase, and the commit phase. In data transmission phase, any exception thrown would be an indication of the ongoing transaction failure, so that we got a clear signal for the next stage whether to commit or abort the ongoing transaction.

...

The only debatable case is timeout exception within commit/abort transaction. It could be treated either fatal or not, as strictly speaking producer would have already done the retrying for max.block.ms, so a timeout here may be suggesting a fatal state to a basic user's perspective. Blindly call abortTxn upon timeout could result in illegal state as well when the previous commit already writes `prepare_commit` on the broker side. Usually caller level could have more sophisticated handling to do an application level retry if necessary, but we don't do any recommendations here. It is highly recommended to increase the request timeout here instead of relying on unreliable retries. 

We also put another try-catch block outside of the whole processing loop to get a chance catching all fatal exceptions and close producer and consumer modules. It is a recommended way to handle fatal exceptions when the application still wants to proceed without any memory leak, but is optional to users.

Unify Wrapped KafkaException

...