Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update KIP with feedback from discussion thread

...

We are proposing the addition of:

  • A public enumeration, ProductionExceptionHandlerResponse, with two possible values: CONTINUE and FAIL
  • A public interface named ProductionExceptionProcessor with ProductionExceptionHandler with a single method, processException, that accepts an Exception and returns a boolean, such that true indicates that the error should be re-thrown by Streams and false indicates it should not. handle, that has the following signature:
    • ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception)
  • A new configuration parameter for Streams named default.production.exception.processor.class that handler that accepts the fully qualified class name of the ProductionExceptionProcessor ProductionExceptionHandler to use.

Proposed Changes

This implementation will modify the KafkaStreams constructor to create a ProductionExceptionProcessor ProductionExceptionHandler from the aforementioned config value, defaulting to a default implementation that always re-throws the error. We'll pipe this processor through the StreamThread/StreamTask into RecordCollectorImpl.

We'll make a change to the onCompletion handler in RecordCollectorImpl to execute the processException handle method in its ProductionExceptionProcessor ProductionExceptionHandler and only record the exception in sendException if processException returns true. handle returns CONTINUE.

These changes will facilitate a number of error handling scenarios. For example, one could choose to write an interface that always fails, but does some additional logging in the process:

Code Block
languagejava
class ExtraLoggingProductionExceptionHandler extends ProductionExceptionHandler {
  ProductionExceptionHandlerResponse handle(ProducerRecord <byte[], byte[]> record, Exception exception) {
    val keyString = new String(record.key(), "UTF-8");
    logger.error("Got an error! Key: " + keyString, exception);
    return ProductionExceptionHandlerResponse.FAIL;
  }
}

You could also create a similar interface that just continues processing and logs a warning:

Code Block
languagejava
class ExtraLoggingProductionExceptionHandler extends ProductionExceptionHandler {
  ProductionExceptionHandlerResponse handle(ProducerRecord <byte[], byte[]> record, Exception exception) {
    val keyString = new String(record.key(), "UTF-8");
    logger.warn("Got an error! Key: " + keyString, exception);
    return ProductionExceptionHandlerResponse.CONTINUE;
  }
}

 

Compatibility, Deprecation, and Migration Plan

...