Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
    ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	final boolean shouldCommit = 
	try {
    	producer.beginTransaction();

		// Do some processing and build the records we want to produce.
  		List<ProducerRecord> processed = process(consumed);

		for (ProducerRecord record : processed)
    		producer.send(record, (metadata, exception) -> {
				// not required to capture the exception here.
			});		
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		return true;
 	} catch (KafkaExceptionException e) {
		// Catch any exception thrown from the data transmission phase.
		return false;
 	}

	try {
        if (shouldCommit) {
            TransactionCommitResult result = producer.commitTransaction();
            if (result.failed()) {
				resetToLastCommittedPositions(consumer);
				producer.abortTransaction();
			}
        } else {
			resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
        }
    } catch (KafkaException e) {
      	// log the error
	  	if (exception.cause() != null || e instanceOf TimeoutException) {
		  	// thrown exception is not fatal and the transaction state could be reset.
			resetToLastCommittedPositions(consumer);
			producer.abortTransaction();  
        } else {
			throw e;	  				
	  	}
    }
}
}

producer.close();

In the above example, we separate the transactional processing into two phases: the data transmission phase, and the commit phase. In data transmission phase, 

...