...
This KIP is aimed at improving the error-handling semantics in Kafka Streams when Kafka Steams fails to serialize a message to the downstream sink by providing an interface that can provide custom massaging messaging of the error (e.g. report to a custom metrics system) and indicate to Streams whether or not it should re-throw the Exception, thus causing the application to fall over.
...
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-7499
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
We are proposing addition of a new overloaded method in ProductionExceptionHandler interface, handle
handleSerializationException
, that has the following signature:
ProductionExceptionHandlerResponse handlehandleSerializationException(ProducerRecord record, boolean isKey, Exception exception);
Proposed Proposed Changes
This implementation will add override the overloaded new method, handle
handleSerializationException, in the following two classes:
...
class:
AlwaysProductionExceptionHandler
and returns response asCONTINUE
We'll implement the following error handling logic to the send in RecordCollectorImpl. The overloaded new method, handle
handleSerializationException
, in ProductionExceptionHandler will be invoked when
ClassCastException
is thrown while serializing record key / value. Today, we are throwingStreamsException
on hitting this exception. Whether to throw the
exception to the user will be decided based on the response received from ProductionExceptionHandlerhandle
handleSerializationException
method.- If the result is
CONTINUE
, log a note atDEBUG
that we received that result and are not failing Streams as a result. - If the result is
FAIL
, log a message atERROR
that we received that result and throwStreamsException
so Streams will fail.
- If the result is
- Any other unchecked exceptions, that thrown during record key / value serialization.
- If the result is
CONTINUE
, log a note atDEBUG
that we received that result and are not failing Streams as a result. - If the result is
FAIL
, log a message atERROR
that we received that result and setsendException
so Streams will fail.
- If the result is
...
The default behavior will be consistent with the existing behavior. The new overloaded method, handle
handleSerializationException
, will have a implementation that is set to FAIL
by default.
...
We have considered to reuse the existing handle(ProducerRecord<byte[], byte[]> record, Exception exception)
method in ProductionExceptionHandler
, but it has the following limitation:
- The parameter
ProducerRecord
key and value type is set tobyte[],
on hitting the serialization exception the record key and value type may not bebyte[].
To explicitly tell the user that the serialization exception occurred when serializing the record key or value.