Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
    ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	final boolean shouldCommit = 
	try {
    	producer.beginTransaction();

		// Do some processing and build the records we want to produce.
  		List<ProducerRecord> processed = process(consumed);

		for (ProducerRecord record : processed)
    		producer.send(record, (metadata, exception) -> {
				// not required to capture the exception here.
			});		
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		return true;
 	} catch (Exception e) {
		// Catch any exception thrown from the data transmission phase.
		return false;
 	}

	try {
        if (shouldCommit) {
            producer.commitTransaction();
        } else {
			resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
        }
    } catch (KafkaExceptionTimeoutException e) {
		// the transaction state could be 	//reset logwhen the error original commit/abort times out.
		//  	if (exception.cause() != null || e instanceOf TimeoutException) {
		  	// thrown exception is not fatal and the transaction state could be reset.
			This is a best-effort demo approach to avoid a producer shutdown, 
		// if abort times out again, the timeout exception will be thrown
		// to the application layer. The total retry time would be 2 * max.block.ms
		resetToLastCommittedPositions(consumer);
			producer.abortTransaction();  
	} catch (KafkaException e)     } else {
	{
	  	producer.close();
		consumer.close();
		throw e;	  				
	  	}
    }
}
}

producer.close();

In the above example, we separate the transactional processing into two phases: the data transmission phase, and the commit phase. In data transmission phase, any exception thrown would be an indication of the ongoing transaction failure, so that we got a clear signal for the next stage whether to commit or abort the ongoing transaction.

Unify Wrapped Exception in Commit Phase

As discussed in the motivation section, in KafkaProducer we have a logic to wrap all thrown exceptions as KafkaException. To make the semantic clear and for advanced users such as Kafka Streams to better understand the root cause, we shall no longer wrap any fatal exceptions, but instead only wrap non-fatal ones as KafkaException. We also detect certain cases where we did a double-wrap of KafkaException internally, which will be addressed to ensure only one layer wrapping is supported.

Callback Exception Improvement

...