Table of Contents |
---|
Status
Current state: Under DiscussionAdopted (1.0.0)
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
/** * An interface that allows user code to inspect a record that has failed processingdeserialization */ public interface RecordExceptionHandlerDeserializationExceptionHandler extends Configurable { /** * Inspect a record and the exception received * @param context processor context * @param record record that failed deserialization * @param exception the actual exception */ HandlerResponseDeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception); } public enum HandlerResponseDeserializationHandlerResponse { /* continue with processing */ CONTINUE(1), /* fail the processing and stop */ FAIL(2); } |
...
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"
One configuration option will be introduced (to be passed on RecordExceptionHandler.configure()) with type "HandlerResponse". This effectively tells the exception handler whether to continue or fail. In the future we might consider adding other options as well.
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE = "default.deserialization.exception.response"
One default implementation of the interface will be provided:
Two implementations of the interface will be provided. The default will be the LogAndFailExceptionHandler as seen below:
Code Block | ||
---|---|---|
| ||
// logs the error and returns CONTINUE
public class LogAndContinueExceptionHandler implements RecordExceptionHandler {...}
// logs the error and returns FAIL
public class LogAndFailExceptionHandler | ||
Code Block | ||
| ||
public class DefaultExceptionHandler implements RecordExceptionHandler {...} // Then in StreamsConfig.java: .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Type.CLASS, DefaultExceptionHandlerLogAndFailExceptionHandler.class.getName(), Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG, Type.STRING, HandlerResponse.FAIL.toString(), Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_DOC) |
In addition, we also added a metric that keeps track of the number of records skipped per source node.
Compatibility, Deprecation, and Migration Plan
...