Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 

Status

Current stateUnder DiscussionAdopted (1.0.0)

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
/**
 * An interface that allows user code to inspect a record that has failed processingdeserialization
 */
public interface RecordExceptionHandlerDeserializationExceptionHandler extends Configurable {
 /**
  * Inspect a record and the exception received
  * @param context processor context
  * @param record record that failed deserialization
  * @param exception the actual exception
  */ 
 HandlerResponseDeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception);
}
 
public enum HandlerResponseDeserializationHandlerResponse {
	/* continue with processing */	
	CONTINUE(1), 
	/* fail the processing and stop */
	FAIL(2);
}

...

public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"

 

One configuration option will be introduced (to be passed on RecordExceptionHandler.configure()) with type "HandlerResponse". This effectively tells the exception handler whether to continue or fail. In the future we might consider adding other options as well. 

public static final String DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE = "default.deserialization.exception.response"

 

One default implementation of the interface will be provided:

Two implementations of the interface will be provided. The default will be the LogAndFailExceptionHandler as seen below:

Code Block
languagejava
// logs the error and returns CONTINUE
public class LogAndContinueExceptionHandler implements RecordExceptionHandler {...}
 
// logs the error and returns FAIL
public class LogAndFailExceptionHandler
Code Block
languagejava
public class DefaultExceptionHandler implements RecordExceptionHandler {...}
 
// Then in StreamsConfig.java:
 
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
        Type.CLASS,
        DefaultExceptionHandlerLogAndFailExceptionHandler.class.getName(),
        Importance.MEDIUM,
        DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
 
.define(DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG,
        Type.STRING,
        HandlerResponse.FAIL.toString(),
        Importance.MEDIUM,
        DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_DOC)

In addition, we also added a metric that keeps track of the number of records skipped per source node.

Compatibility, Deprecation, and Migration Plan

...