Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 

Status

Current stateUnder DiscussionAdopted (1.0.0)

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When processing billions of records a day, it is likely that some of these records may be corrupt or that the serialization or processing logic may be incorrect and buggy or it cannot handle all record types. We call such records “poison pills” in this KIP. In this cases, users might want some options with how to handle poison pills. Currently either the entire streams pipeline is closed upon such a record, or the user has to do lots of extra work with operations like flatMap, branch etc to circumvent the exceptions.

Proposed Changes

We propose to give users more control on how to handle poison pills as well as good default options they can just pick from. In a nutshell, this proposal calls for skipping over all bad records and calling an exception handler for each of them. The handler may in turn decide to just log the error, or fail the pipeline (after 1 error or a % of errors have been observed). The Rejected Alternatives contains more complex handling options that can be done at a latter stage. Note that all processing nodes are linked to just one handler.

...

Code Block
languagejava
/**
 * An interface that allows user code to inspect a record that has failed processingdeserialization
 */
public interface DeserializationExceptionHandler extends RecordExceptionHandlerConfigurable {
 /**
  * Inspect a record and its context the exception received
  * @param context processor context
  * @param record record that failed deserialization
  * @param exception the actual exception
  */ 
 voidDeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, RecordContext context, ExceptionType exceptionType Exception exception);
}
 
public enum DeserializationHandlerResponse {
	/* continue with processing */	
	CONTINUE(1), 
	/* fail the processing and stop */
	FAIL(2);
}
 

Users will be able to set an exception handler through the following new config option :which points to a class name.

public static final String DEFAULT_DESERIALIZATIONStreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"

 

Several default Two implementations of the interface will be provided:

...

. The default will be the LogAndFailExceptionHandler as seen below:

...

languagejava
Code Block
languagejava

...

public class LogRecordExceptionHandler implements RecordExceptionHandler {...}

An implementation that logs the error and stops the processing altogether:

// logs the error and returns CONTINUE
public class 

...

LogAndContinueExceptionHandler implements 

...

RecordExceptionHandler {...}

...

An implementation that logs the error and stops processing if a % of records have failed where the % can be configured through a "configure" method:

...

languagejava

 
// logs the error and returns FAIL
public class 

...

LogAndFailExceptionHandler implements RecordExceptionHandler {...}

...

 

Note that in all cases, the streams pipeline skips over the record after the handler is called (unless the handler stops the pipeline).  Contrast this with other possible options as described in Rejected Alternatives below.

We propose to add two exception types:

Code Block
languagejava
public enum ExceptionType {
// Then in StreamsConfig.java:
 
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    /* exceptions due to incorrectType.CLASS,
 deserialization indicating corrupt record */
    DeserializationException(1LogAndFailExceptionHandler.class.getName(),
 
    /* exceptions due toImportance.MEDIUM,
 failures in processing */
    ProcessingException(2);
}DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)

In addition, we also added a metric that keeps track of the number of records skipped per source node.

Compatibility, Deprecation, and Migration Plan

  • The way poison pills are handled today is that the uncaught exception handler eventually catches them and the streams pipeline will shut down. We can choose to maintain that default method for backwards compatibility using the LogAndFailExceptionHandler HandlerResponse.FAIL by default.

Rejected Alternatives

  • Allow retries. Basically for each record exception, allow the processing logic to retry it. In this case RecordExceptionHandler.handle() would return not void, but an enum with RETRY as an option. This options leads to quite a bit of complexity since each processing node needs to implement a custom retry logic. Furthermore we don’t have enough evidence to know failure types for which retrying would solve the issue. We might want to do this once we have more evidence that it’s useful.

  • Allow record substitution. If a record is corrupt, why not have the exception handler inject a good record to substitute the corrupt one? There are two options here. The record could be injected before the processing logic: 1) process->exception->inject record->reprocess new injected record->forward.... Alternatively the record could be injected after the processing logic: 2) process->exception->inject record and forward it to next processing node.

    Again this is theoretically viable but if it’s not clear if it’s useful. We can consider adding such logic at a later stage.

  • Allow per-node handlers. Note that the proposal above uses one exception handler. We could allow for each node to have it’s own handler (e.g., map has a handler, a reduce has a different handler). It is not clear if the complexity is worth it. We can consider this in a later stage if there are sufficient use cases.
  • Allow dead letter queue. We could send each record to a special topic. Note that this will still be possible with this KIP, but we won't provide such functionality in the library for now and just stick with the basic building blocks.