You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

This KIP is aimed at improving the error-handling semantics in Kafka Streams when Kafka Steams fails to produce a message to the downstream sink by providing an interface that can provide custom massaging of the error (e.g. report to a custom metrics system) and indicate to Streams whether or not it should re-throw the Exception, thus causing the application to fall over.

Status

Current state: Under Discussion

Discussion thread: Click here

JIRA: KAFKA-6086

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

At MailChimp, we've run into occasional situations where a message that comes into streams just under the size limit on the inbound size (say for the sake of illustration, 950KB with a 1MB max.request.size on the Producer) and we change it to a different serialization format for producing to the destination topic. In these cases, it's possible that the serialization format we change to comes in as larger than the inbound message. (For example, if we were going to JSON we might get something much larger than if the inbound format was a binary format of some kind.)

These cases are rare, but when they occur they cause our entire application to fall over and someone gets woken up in the middle of the night to figure out how to deal with it. Further, solutions that address this issue by hacking around it (increasing the max.request.size or trying to manually commit to the offsets topic to skip the large messages) each have their own problems. It would be preferable for us to be able to optionally provide code to ignore an ApiException returned from the producer. Such an interface would permit us to provide code that will log an error and instruct Streams to not re-throw the error.

Public Interfaces

We are proposing the addition of:

  • A public interface named ProductionExceptionProcessor with a single method, processException, that accepts an Exception and returns a boolean, such that true indicates that the error should be re-thrown by Streams and false indicates it should not.
  • A new configuration parameter for Streams named production.exception.processor that accepts the fully qualified class name of the ProductionExceptionProcessor to use.

Proposed Changes

This implementation will modify the KafkaStreams constructor to create a ProductionExceptionProcessor from the aforementioned config value, defaulting to a default implementation that always re-throws the error. We'll pipe this processor through the StreamThread/StreamTask into RecordCollectorImpl.

We'll make a change to the onCompletion handler in RecordCollectorImpl to execute the processException method in its ProductionExceptionProcessor and only record the exception in sendException if processException returns true.

Compatibility, Deprecation, and Migration Plan

The default behavior here will be consistent with existing behavior. Changing that behavior will be opt-in by providing the new config setting and an implementation of the interface. Constructors of RecordCollectorImpl, StreamThread, and StreamTask will need to change, but as those aren't (to my knowledge) part of the public interface that should be fine. We could even provide overloaded constructors with the old signatures if we're concerned about binary compatibility of this change.

Rejected Alternatives

We also considered:

  • A very targeted config setting that would ignore record too large errors, but feel that this solution is better because it could also be used to do granular reporting to other services on any kind of exception that could come from the completion handler.

 

  • No labels