Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
    ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	final boolean shouldCommit;
	try {
    	producer.beginTransaction();

		// Do some processing and build the records we want to produce.
  		List<ProducerRecord> processed = process(consumed);

		for (ProducerRecord record : processed)
    		producer.send(record, (metadata, exception) -> {
				// not required to capture the exception here.
			});		
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		shouldCommit = true;
 	} catch (Exception e) {
		// Catch any exception thrown from the data transmission phase.
		shouldCommit = false;
 	}

	try {
        if (shouldCommit) {
            producer.commitTransaction(); // new API with a returned flag.
        } else {
            resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
        }
    } catch (CommitFailedException e) {
        // Transaction commit failed with abortable error, user could reset
        // the application state and resume with a new transaction. The root
        // cause was wrapped in the thrown exception. 
        resetToLastCommittedPositions(consumer);
        producer.abortTransaction(); 
    } catch (KafkaException e) {
        producer.close();
        consumer.close();
        throw e;                   
    }
}

...

In the commit phase, we should decide whether to commit or abort transaction based on the previous stage result. In new Producer API, commitTransaction() will no longer throw non-fatal exceptions in their raw formats. Instead, it would try to wrap all non-fatal exceptions as `CommitFailedException` so that on the caller side it would be much easier to catch and handle. This means any exception other than `CommitFailedException` caught during the commit phase will be definitely fatal, so user's error handling experience could be simplified by just doing a controlled shutdown.

...