...
We are proposing the addition of:
- A public enumeration,
ProductionExceptionHandlerResponse
, with two possible values:CONTINUE
andFAIL
- A public interface named
ProductionExceptionProcessor
withProductionExceptionHandler
with a single method,processException
, that accepts anException
and returns aboolean
, such thattrue
indicates that the error should be re-thrown by Streams andfalse
indicates it should not.handle
, that has the following signature:ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception)
- A new configuration parameter for Streams named
default.production.exception.processor.class
thathandler
that accepts the fully qualified class name of theProductionExceptionProcessor
ProductionExceptionHandler
to use.
Proposed Changes
This implementation will modify the KafkaStreams
constructor to create a ProductionExceptionProcessor
ProductionExceptionHandler
from the aforementioned config value, defaulting to a default implementation that always re-throws the error. We'll pipe this processor through the StreamThread
/StreamTask
into RecordCollectorImpl
.
We'll make a change to the onCompletion handler in RecordCollectorImpl to execute the processException
handle
method in its ProductionExceptionProcessor
ProductionExceptionHandler
and only record the exception in sendException
if processException
returns true
. handle
returns CONTINUE
.
These changes will facilitate a number of error handling scenarios. For example, one could choose to write an interface that always fails, but does some additional logging in the process:
Code Block | ||
---|---|---|
| ||
class ExtraLoggingProductionExceptionHandler extends ProductionExceptionHandler {
ProductionExceptionHandlerResponse handle(ProducerRecord <byte[], byte[]> record, Exception exception) {
val keyString = new String(record.key(), "UTF-8");
logger.error("Got an error! Key: " + keyString, exception);
return ProductionExceptionHandlerResponse.FAIL;
}
} |
You could also create a similar interface that just continues processing and logs a warning:
Code Block | ||
---|---|---|
| ||
class ExtraLoggingProductionExceptionHandler extends ProductionExceptionHandler {
ProductionExceptionHandlerResponse handle(ProducerRecord <byte[], byte[]> record, Exception exception) {
val keyString = new String(record.key(), "UTF-8");
logger.warn("Got an error! Key: " + keyString, exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
} |
Compatibility, Deprecation, and Migration Plan
...