You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

 

Status

Current stateUnder Discussion

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When processing billions of records a day, it is likely that some of these records may be corrupt (e.g., because of a random data flip) or that the processing logic may be incorrect and cannot handle all record types. We call such records “poison pills” in this KIP. In this cases, users might want some options with how to handle poison pills. Currently either the entire streams pipeline is closed upon such a record, or the user has to do lots of extra work with operations like flatMap, branch etc to circumvent the exceptions.

Proposed Changes

We propose to give users more control on how to handle poison pills as well as good default options they can just pick from. In a nutshell, this proposal calls for skipping over all bad records and calling an exception handler for each of them. The handler may in turn decide to just log the error, or fail the pipeline (after 1 error or a % of errors have been observed). The Rejected Alternatives contains more complex handling options that can be done at a latter stage. Note that all processing nodes are linked to just one handler.

The basic interface added will be:

/**
* An interface that allows user code to inspect a record that has failed processing
*/
public interface RecordExceptionHandler {
/**
* Inspect a record and its context
*/
void handle(ConsumerRecord<byte[], byte[]> record, RecordContext context, ExceptionType exceptionType);
}

Users will be able to set an exception handler through the following new config option:

StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER

 

Several default implementations of the interface will be provided:

  • An implementation that just logs the error to the log file:
public class LogRecordExceptionHandler implements RecordExceptionHandler
  • An implementation that logs the error and stops the processing altogether:
public class LogAndFailExceptionHandler implements RecordExceptionHander{}
  • An implementation that logs the error and stops processing if a % of records have failed where the % can be configured through a "configure" method:
public class LogAndThresholdExceptionHandler implements RecordExceptionHandler{}
 

Note that in all cases, the streams pipeline skips over the record after the handler is called (unless the handler stops the pipeline).  Contrast this with other possible options as described in Rejected Alternatives below.

We propose to add two exception types:

public enum ExceptionType {
    /* exceptions due to incorrect deserialization indicating corrupt record */
    DeserializationException(1),
    /* exceptions due to failures in processing */
   ProcessingException(2);
}

Compatibility, Deprecation, and Migration Plan

  • The way poison pills are handled today is that the uncaught exception handler eventually catches them and the streams pipeline will shut down. We can choose to maintain that default method for backwards compatibility using the LogAndFailExceptionHandler by default.

Rejected Alternatives

  • Allow retries. Basically for each record exception, allow the processing logic to retry it. In this case RecordExceptionHandler.handle() would return not void, but an enum with RETRY as an option. This options leads to quite a bit of complexity since each processing node needs to implement a custom retry logic. Furthermore we don’t have enough evidence to know failure types for which retrying would solve the issue. We might want to do this once we have more evidence that it’s useful.

  • Allow record substitution. If a record is corrupt, why not have the exception handler inject a good record to substitute the corrupt one? There are two options here. The record could be injected before the processing logic: 1) process->exception->inject record->reprocess new injected record->forward.... Alternatively the record could be injected after the processing logic: 2) process->exception->inject record and forward it to next processing node.

    Again this is theoretically viable but if it’s not clear if it’s useful. We can consider adding such logic at a later stage.

  • Allow per-node handlers. Note that the proposal above uses one exception handler. We could allow for each node to have it’s own handler (e.g., map has a handler, a reduce has a different handler). It is not clear if the complexity is worth it. We can consider this in a later stage if there are sufficient use cases.

  • No labels