Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP is aimed at improving the error-handling semantics in Kafka Streams when Kafka Steams fails to produce a message to the downstream sink by providing an interface that can provide custom massaging of the error (e.g. report to a custom metrics system) and indicate to Streams whether or not it should re-throw the Exception, thus causing the application to fall over.

Status

Current state: Under Discussion Adopted (1.1.0)

Discussion thread: Click here

...

We are proposing the addition of:

  • A public enumeration, ProductionExceptionHandlerResponse, with two possible values: CONTINUE and FAIL
  • A public interface named ProductionExceptionProcessor with ProductionExceptionHandler with a single method, processException, that accepts an Exception and returns a boolean, such that true indicates that the error should be re-thrown by Streams and false indicates it should not handle, that has the following signature:
    • ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception)
  • One default implementation of ProductionExceptionHandler
    • The DefaultProductionExceptionHandler, the default implementation that maintains the current behavior of always failing when production exceptions occur.
  • A new configuration parameter for Streams named default.production.exception.processor.class that handler that accepts the fully qualified class name of the ProductionExceptionProcessor ProductionExceptionHandler to use.

Proposed Changes

This implementation will modify the KafkaStreams constructor to create a ProductionExceptionProcessor ProductionExceptionHandler from the aforementioned config value, defaulting to a default implementation that always re-throws the error (the DefaultProductionExceptionHandler mentioned above). We'll pipe this processor through the StreamThread/StreamTask into RecordCollectorImpl.

We'll make a change to the onCompletion handler in RecordCollectorImpl to execute the processException method in its ProductionExceptionProcessor and only record the exception in sendException if processException returns true.implement the following error handling logic to the onCompletion handler in RecordCollectorImpl:

  1. If the Exception that is thrown is a ProducerFencedException, behave as we do today and do not invoke the ProductionExceptionHandler as these exceptions are self-healing.
  2. If the Exception that is thrown is fatal will affect all records and should cause Streams to always fail. If so, then do not invoke the ProductionExceptionHandler because its result will have to be ignored. We should log that we're ignoring these exceptions at DEBUG level.
    1. The exceptions that meet this classification are:
      1. AuthenticationException
      2. AuthorizationException
      3. SecurityDisabledException
      4. InvalidTopicException
      5. UnknownServerException
      6. IllegalStateException
      7. OffsetMetadataTooLarge
      8. SerializationException
      9. TimeoutException when it occurs immediately on send due to a full buffer

  3. If the Exception that is thrown meets neither of the above conditions, determine if sendException is already set. If so, do not invoke the ProductionExceptionHandler because this would mean that we've already invoked it and decided to FAIL. Invoking it again would just result in an ignored result.
  4. If none of the conditions above is met, invoke the handle method in the ProductionExceptionHandler and check the result.
    1. If the result is CONTINUE, log a note at DEBUG that we received that result and are not failing Streams as a result. This ensures that it's not possible for a client developer to ship code that totally swallows errors without presenting any kind of activity in the log.
    2. If the result is FAIL, log a message at ERROR that we received that result and set sendException so Streams will fail.

The error handler will only be invoked for exceptions that are returned via the producer callback, and will not be invoked for Exceptions thrown directly from send as all of those exceptions should be seen by Streams immediately.

These changes will facilitate a number of error handling scenarios. For example, one could choose to write an interface that always fails, but does some additional logging in the process:

Code Block
languagejava
class ExtraLoggingProductionExceptionHandler extends ProductionExceptionHandler {
  ProductionExceptionHandlerResponse handle(ProducerRecord <byte[], byte[]> record, Exception exception) {
    val keyString = new String(record.key(), "UTF-8");
    logger.error("Got an error! Key: " + keyString, exception);
    return ProductionExceptionHandlerResponse.FAIL;
  }
}

You could also create a similar interface that just continues processing and logs a warning:

Code Block
languagejava
class ExtraLoggingProductionExceptionHandler extends ProductionExceptionHandler {
  ProductionExceptionHandlerResponse handle(ProducerRecord <byte[], byte[]> record, Exception exception) {
    val keyString = new String(record.key(), "UTF-8");
    logger.warn("Got an error! Key: " + keyString, exception);
    return ProductionExceptionHandlerResponse.CONTINUE;
  }
}

 

Compatibility, Deprecation, and Migration Plan

...