Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
    ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	final boolean shouldCommit;
	try {
    	producer.beginTransaction();

		// Do some processing and build the records we want to produce.
  		List<ProducerRecord> processed = process(consumed);

		for (ProducerRecord record : processed)
    		producer.send(record, (metadata, exception) -> {
				// not required to capture the exception here.
			});		
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		shouldCommit = true;
 	} catch (Exception e) {
		// Catch any exception thrown from the data transmission phase.
		shouldCommit = false;
 	}

	try {
		final boolean commitSucceed;
        if (shouldCommit) {
            commitSucceed = producer.commitTransaction(); // new API with a returned flag.
        } else {
			commitSucceed = false;
		} 

		if (commitSucceed) {
			// Commit to external storage if necessary
        } else {
			resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
		}
    } catch (TimeoutException e) {
		// the transaction state could be reset when the original commit/abort times out.
		// This is a best-effort demo approach to avoid a producer shutdown, 
		// if abort times out again, the timeout exception will be thrown
		// to the application layer. The total retry time would be 2 * max.block.ms
		resetToLastCommittedPositions(consumer);
		producer.abortTransaction();  
	} catch (KafkaException e) {
	  	producer.close();
		consumer.close();
		throw e;	  				
    }
}

...