Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: commit flag

...

Code Block
languagejava
titleSample.java
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
    ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
	final boolean shouldCommit;
	try {
    	producer.beginTransaction();

		// Do some processing and build the records we want to produce.
  		List<ProducerRecord> processed = process(consumed);

		for (ProducerRecord record : processed)
    		producer.send(record, (metadata, exception) -> {
				// not required to capture the exception here.
			});		
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

		shouldCommit = true;
 	} catch (Exception e) {
		// Catch any exception thrown from the data transmission phase.
		shouldCommit = false;
 	}

	try {
		final boolean commitSucceed;
        if (shouldCommit) {
            commitSucceed = producer.commitTransaction();
        } else {
			commitSucceed = false;
			resetToLastCommittedPositions(consumer);
  } 

		if (commitSucceed) {
			// Commit to external storage if necessary
        }  producer.abortTransaction(else {
			resetToLastCommittedPositions(consumer);
            producer.abortTransaction();
		}
    } catch (TimeoutException e) {
		// the transaction state could be reset when the original commit/abort times out.
		// This is a best-effort demo approach to avoid a producer shutdown, 
		// if abort times out again, the timeout exception will be thrown
		// to the application layer. The total retry time would be 2 * max.block.ms
		resetToLastCommittedPositions(consumer);
		producer.abortTransaction();  
	} catch (KafkaException e) {
	  	producer.close();
		consumer.close();
		throw e;	  				
    }
}

...

In the commit phase, we should decide whether to commit or abort transaction based on the previous stage result. In new Producer API, commitTransaction() will be acting like abortTransaction(), such that no longer throws non-fatal exceptions. This means any exception caught during the commit phase will be definitely fatal, so user's error handling experience could be simplified by just doing a controlled shutdown. If In commitTransaction we added a return boolean, so that even if the commit failed internally with non-fatal exception but wasn't throwing, the next beginTransaction() will doom to fail, causing the `shouldCommit` flag set false, which means the abortTransaction will be triggered automatically in the next iterationwe still got a clear returned signal from commitTransaction to know whether the last commit was successful, as certain EOS users rely on external data storage component and need to perform non-rollback commit operation as necessary. When the returned flag is false, user could assume the last transaction fail, and reset the producer/consumer progress states as necessary.

The only debatable case is timeout exception within commit/abort transaction. It could be treated either fatal or not, as strictly speaking producer would have already done the retrying for max.block.ms, so a timeout here may be suggesting a fatal state to a basic user's perspective. We include the timeout handling in the template here just for an example, usually caller level could have more sophisticated handling to do an application level retry if necessary.  

...

  • The commitTransaction() API will no longer throw non-fatal exceptions
  • All the non-fatal exceptions thrown from data transmission APIs will be wrapped as KafkaException, which we will be documented clearly. This includes:
    • beginTransaction 
    • sendOffsetsToTransaction 
    • send


We would also deprecate the old commitTransaction API with a new one which could return a flag indicating whether the last commit was successful:

Code Block
languagejava
titleKafkaProducer.java
    /**
     * Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction.
     *
     * Further, if any of the {@link #send(ProducerRecord)} calls which were part of the transaction hit irrecoverable
     * errors, this method will throw the last received exception immediately and the transaction will not be committed.
     * So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed.
     *
     * Note that this method will raise {@link TimeoutException} if the transaction cannot be committed before expiration
     * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted.
     * It is safe to retry in either case, but it is not possible to attempt a different operation (such as abortTransaction)
     * since the commit may already be in the progress of completing. If not retrying, the only option is to close the producer.
     *
     * @return a boolean flag indicating whether the last commit was successful
	 *
     * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
     * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized. See the exception for more details
     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
     *         to the partition leader. See the exception for more details
     * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
     *         other unexpected error
     * @throws TimeoutException if the time taken for committing the transaction has surpassed <code>max.block.ms</code>.
     * @throws InterruptException if the thread is interrupted while blocked
     */
	public boolean commitTransaction() throws ProducerFencedException


Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs correspondingly, including 

...