Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: throw CommitFailed

...

Code Block
languagejava
titleSample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
    ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	final boolean shouldCommit;
	try {
    	producer.beginTransaction();

		// Do some processing and build the records we want to produce.
  		List<ProducerRecord> processed = process(consumed);

		for (ProducerRecord record : processed)
    		producer.send(record, (metadata, exception) -> {
				// not required to capture the exception here.
			});		
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		shouldCommit = true;
 	} catch (Exception e) {
		// Catch any exception thrown from the data transmission phase.
		shouldCommit = false;
 	}

	try {
		final boolean commitSucceed;
        if (shouldCommit) {
            commitSucceed = producer.commitTransaction(); // new API with a returned flag.
        } else {
			commitSucceed = false;
		} 

		if (commitSucceed) {
			// Commit to external storage if necessary
        } else {
			resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
		        }
    } catch (TimeoutExceptionCommitFailedException e) {
		// the transaction state could be reset when the// originalTransaction commit/abort failed times out.
		// This is a best-effort demo approach to avoid a producer shutdown, 
		// if abort times out again, the timeout exception will be thrown
		// to the application layer. The total retry time would be 2 * max.block.ms
		resetToLastCommittedPositions(consumer);
		with abortable error, user could reset
        // the application state and resume with a new transaction. The root
        // cause was wrapped in the thrown exception. 
        resetToLastCommittedPositions(consumer);
        producer.abortTransaction(); 
    
	} catch (KafkaException e) {
	        	producer.close();
		        consumer.close();
		        throw e;	  				                   
    }
}

In the above example, we separate the transactional processing into two phases: the data transmission phase, and the commit phase. In data transmission phase, any exception thrown would be an indication of the ongoing transaction failure, so that we got a clear signal for the next stage whether to commit or abort the ongoing transaction.

In the commit phase, we should decide whether to commit or abort transaction based on the previous stage result. In new Producer API, commitTransaction() will be acting like abortTransaction(), such that no longer throws no longer throw non-fatal exceptions in their raw formats. Instead, it would try to wrap non-fatal exceptions as `CommitFailedException` so that on the caller side it would be much easier to catch and handle. This means any exception other than `CommitFailedException` caught during the commit phase will be definitely fatal, so user's error handling experience could be simplified by just doing a controlled shutdown. In commitTransaction we added a return boolean, so that even if the commit failed internally with non-fatal exception but wasn't throwing, we still got a clear returned signal from commitTransaction to know whether the last commit was successful, as certain EOS users rely on external data storage component and need to perform non-rollback commit operation as necessary. When the returned flag is false, user could assume the last transaction fail, and reset the producer/consumer progress states as necessary.

The only debatable case is timeout exception within commit/abort transaction. It could be treated either fatal or not, as strictly speaking producer would have already done the retrying for max.block.ms, so a timeout here may be suggesting a fatal state to a basic user's perspective. We include the timeout handling in the template here just for an example, usually Blindly call abortTxn upon timeout could result in illegal state as well when the previous commit already writes `prepare_commit` on the broker side. Usually caller level could have more sophisticated handling to do an application level retry if necessary.  , but we don't do any recommendations here. It is highly recommended to increase the request timeout here instead of relying on unreliable retries. 

Unify Wrapped KafkaException

...

  • The commitTransaction() API will no longer throw non-fatal exceptions
  • All the non-fatal exceptions thrown from data transmission APIs will be wrapped as KafkaException, which we will be documented clearly. This includes:
    • beginTransaction 
    • sendOffsetsToTransaction 
    • send

We would also deprecate the old commitTransaction API with a new one which could return a flag indicating whether the last commit was successful:

...

languagejava
titleKafkaProducer.java

...

let commitTransaction API only throw CommitFailedException with wrapped cause when hitting non-fatal exceptions, to simply the exception try-catching.

Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs correspondingly, including 

...

We also proposed to add a non-fatal exception wrapper type called TransactionStateCorruptedException to help users distinguish thrown exception types. This solution has compatibility issue and is not necessarily making the developer and user's life easier.

We proposed to to add a return boolean in commitTransaction, so that even if the commit failed internally with non-fatal exception but wasn't throwing, we still got a clear returned signal from commitTransaction to know whether the last commit was successful, as certain EOS users rely on external data storage component and need to perform non-rollback commit operation as necessary. This approach was abandoned as it broke the compatibility since old users would not assume a commitTxn not to be indicating a success when it passes without throwing non-fatal exceptions.