Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Within a transaction session, the internal state tracks the last fatal/abortable error. When the Producer hits a fatal/abortable exception, it will transit to the error state, and when next time someone uses a transactional API, it will throw the buffered exception. The caveat is that today we wrap many non-fatal exceptions as KafkaException, which does not make a clear boundary on whether the thrown exception is fatal – should fail fast, or just abortable – should catch and abort the ongoing transaction to resume. It affects the stability of upstream users as well such as Streams EOS. This KIP tries to address this gap by classifying exceptions based on the measurement of its fatality, so that end user could just catch the non-fatal ones fairly easy.

Public Interfaces

We will add a new transactional commit API with a returned result to deprecate the old one:

Code Block
languagejava
titleKafkaProducer.java
    /**
     * Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction.
     *
     * Further, if any of the {@link #send(ProducerRecord)} calls which were part of the transaction hit irrecoverable
     * errors, this method will throw the last received exception immediately and the transaction will not be committed.
     * So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed.
     *
     * Note that this method will raise {@link TimeoutException} if the transaction cannot be committed before expiration
     * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted.
     * It is safe to retry in either case, but it is not possible to attempt a different operation (such as abortTransaction)
     * since the commit may already be in the progress of completing. If not retrying, the only option is to close the producer.
     *
     * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
     * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized. See the exception for more details
     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
     *         to the partition leader. See the exception for more details
     * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
     *         other unexpected error
     * @throws TimeoutException if the time taken for committing the transaction has surpassed <code>max.block.ms</code>.
     * @throws InterruptException if the thread is interrupted while blocked
     */
     public TransactionResult commitTransaction() throws ProducerFencedException
 

We are proposing a new transactional API usage template which makes more sense to perform EOS processing safe:

Code Block
languagejava
titleSample.java
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
	final boolean shouldCommit = 
	try {
    	producer.beginTransaction();
		producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1]), (metadata, exception) -> {
			// not required to capture the exception here.
		});
		return true;
 	} catch (KafkaException e) {
		return false;
 	}

	try {
        if (shouldCommit) {
            TransactionResult result = producer.commitTransaction();
            if (result.failed()) {
 				// Cleanup state, rewind processing offsets
				producer.abortTransaction();
			}
        } else {
			// Cleanup state, rewind processing offsets
            producer.abortTransaction();
        }
    } catch (KafkaException e) {
      // log the error
      throw e;
    }
}
}

producer.close();


Callback Exception Improvement

As we have seen, there is a callback mechanism in the producer#send which carries the exception type. In EOS setup, it is not required to handle the exception, but for non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. To make the handling easier and consistent, we decide to wrap all fatal exceptions (in the producer perspective) as KafkaException.

In a transactional API user's perspective, they should treat TransactionStateCorruptedException as non-fatal and retry. For all other fatal errors, they will be thrown directly instead of wrapping. If the caller calls producer#abortTransaction again in a fatal state, the fatal exception shall still be thrown which is not the recommended approach.

Code Block
languagejava
titleSample.java
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
	try {
    	producer.beginTransaction();
		producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1]));
	    producer.commitTransaction();
 	} catch (TransactionStateCorruptedException e) {
     	// For TransactionStateCorruptedException, just abort the transaction and try again.
     	producer.abortTransaction();
 	} catch (ProducerFencedException | AuthorizationException e) {
  		// User could choose to catch any fatal exceptions as they see necessary, here we just close the producer and exit.
        break;
	} 
}

producer.close();

Proposed Changes

Below is a full list of exception types that could be thrown from producer API as of today, and we flagged those that should be thrown as fatal exception by themselves, vs exceptions that should be non-fatal. All the non-fatal exception will be wrapped as TransactionStateCorruptedException when being thrown.

  1. ProducerFencedException (Fatal)
  2. InvalidProducerEpochException (Non-fatal)
  3. KafkaException, which potentially wraps the following exceptions:
    1. IllegalStateException (Fatal)
    2. InvalidPidMappingException (Non-fatal)
    3. TransactionAbortedException (Non-fatal)
    4. ClusterAuthorizationException for idempotent send (Fatal)
    5. OutOfOrderSequenceException (Non-fatal)
    6. UnknownProducerIdException for producer state loss (Non-fatal)
    7. TransactionalIdAuthorizationException (Fatal)
    8. UnsupportedVersionException if transaction semantic is not supported (Fatal)
    9. AuthenticationException for transactional request authentication (Fatal)
    10. UnsupportedForMessageFormatException (Fatal)
    11. RuntimeException for detecting more than one inflight request, should be illegal state (Fatal)
    12. InvalidRecordException (Fatal)
    13. InvalidRequiredAcksException (Fatal)
    14. NotEnoughReplicasAfterAppendException (Non-fatal)
    15. NotEnoughReplicasException (Non-fatal)
    16. RecordBatchTooLargeException (Fatal)
    17. InvalidTopicException (Fatal)
    18. CorruptRecordException (Non-fatal)
    19. UnknownTopicOrPartitionException (Non-fatal)
    20. NotLeaderOrFollowerException (Non-fatal)
    21. GroupAuthorizationException (Fatal)
    22. KafkaException
      1. indicates retriable idempotent sequence (Non-fatal)
      2. indicates fatal transactional sequence (Fatal)
      3. indicates Producer closed (Non-fatal)
      4. InvalidTxnState (Fatal)
      5. unexpected reason (Fatal)
    23. TimeoutException for expired batch (Non-fatal)

All the non-fatal exception will be only thrown by the following RPCs:

  • beginTransaction 
  • sendOffsetsToTransaction 
  • send


Documentation change

We shall put the newly marked fatal exceptions on the public Producer API docs as well, including 

...