...
This implementation will modify the KafkaStreams
constructor to create a ProductionExceptionHandler
from the aforementioned config value, defaulting to a default implementation that always re-throws the error (the DefaultProductionExceptionHandler
mentioned above). We'll pipe this processor through the StreamThread
/StreamTask
into RecordCollectorImpl
.
We'll make a change to the implement the following error handling logic to the onCompletion handler in RecordCollectorImpl to execute the handle
method in its ProductionExceptionHandler
and only record the exception in sendException
if handle
returns CONTINUE
. We'll ensure that the RecordCollectorImpl always logs when it processes a response from a ProductionExceptionHandler
- logging the current error message at ERROR
level when instructed to FAIL
and logging a comparable error message a DEBUG
level when instructed to CONTINUE
. This and to the try
statement that wraps it:
- If the Exception that is thrown is a
ProducerFencedException
, behave as we do today and do not invoke theProductionExceptionHandler
as these exceptions are self-healing. - If the Exception that is thrown will affect all records and should cause Streams to always fail, then do not invoke the
ProductionExceptionHandler
because its result will have to be ignored. We should log that we're ignoring these exceptions at DEBUG level.- The exceptions that meet this classification are: TODO
- If the Exception that is thrown meets neither of the above conditions, determine if
sendException
is already set. If so, do not invoke theProductionExceptionHandler
because this would mean that we've already invoked it and decided to FAIL. Invoking it again would just result in an ignored result. - If none of the conditions above is met, invoke the handle method in the ProductionExceptionHandler and check the result.
- If the result is CONTINUE, log a note at DEBUG that we received that result and are not failing Streams as a result. This ensures that it's not possible for a client developer to ship code that totally swallows errors without presenting any kind of activity in the log.
- If the result is FAIL, log a message at ERROR that we received that result and set sendException so Streams will fail.
These changes will facilitate a number of error handling scenarios. For example, one could choose to write an interface that always fails, but does some additional logging in the process:
...