THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Table of Contents |
---|
Status
Current state: Under DiscussionAdopted (1.0.0)
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
/** * An interface that allows user code to inspect a record that has failed processingdeserialization */ public interface RecordExceptionHandlerDeserializationExceptionHandler extends Configurable { /** * Inspect a record and the exception received * @param context processor context * @param record record that failed deserialization * @param exception the actual exception */ HandlerResponseDeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception); } public enum HandlerResponseDeserializationHandlerResponse { /* continue with processing */ CONTINUE(1), /* fail the processing and stop */ FAIL(2); } |
...
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"
One default implementation Two implementations of the interface will be provided. The default will be the LogAndFailExceptionHandler as seen below:
Code Block | ||
---|---|---|
| ||
// logs the error and returns CONTINUE public class DefaultExceptionHandlerLogAndContinueExceptionHandler implements RecordExceptionHandler {...} // logs the error and returns FAIL public class LogAndFailExceptionHandler implements RecordExceptionHandler {...} // Then in StreamsConfig.java: .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Type.CLASS, DefaultExceptionHandlerLogAndFailExceptionHandler.class.getName(), Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) |
In addition, we also added a metric that keeps track of the number of records skipped per source node.
Compatibility, Deprecation, and Migration Plan
- The way poison pills are handled today is that the uncaught exception handler eventually catches them and the streams pipeline will shut down. We can choose to maintain that default method for backwards compatibility using the LogAndFailExceptionHandler HandlerResponse.FAIL by default.
Rejected Alternatives
...