Table of Contents |
---|
Status
Current state: Under Vote Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
Vote thread: here
JIRA: KAFKA-5682
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.errors; import org.apache.kafka.common.TopicPartition; /** * Any exception encountered on an invalid record (deserialization error, corrupt record, etc...) */ public class FaultyRecordExceptionRecordDeserializationException extends SerializationException { private static final long serialVersionUID = 1L; private TopicPartition partition; private long offset; public FaultyRecordExceptionRecordDeserializationException(TopicPartition partition, long offset, String message) { this(partition, offset, message, null); } public FaultyRecordExceptionRecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) { super(message, cause); this.partition = partition; this.offset = offset; } public TopicPartition partition() { return partition; } public long offset() { return offset; } /* avoid the expensive and useless stack trace for deserialization exceptions */ @Override public Throwable fillInStackTrace() { return this; } } |
...
Introduce one new public exception - `FaultyRecordException` `RecordDeserializationException` which extends `SerializationException` (and indirectly extends `KafkaException`). This new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch it and access the metadata about the record which caused the exception.
This exception will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer API and allowing the user to handle them and access by accessing the partition/offset enabling him to seek past the faulty record.
Example
Users are expected to seek one offset past the faulty one.
Code Block |
---|
while (true) { try { consumer.poll(Duration.ofMillis(50)); } catch (RecordDeserializationException e) { consumer.seek(e.partition(), e.offset() + 1); } } |
Compatibility, Deprecation, and Migration Plan
...
- Add `TopicPartition partition` and `long offset` attributes to the existing `SerializationException` and `KafkaException`
- They will still be backwards-compatible, but might easily result in `NullPointerException`, since not every use case (especially for `KafkaException`) has the appropriate offset.
- Users will need to check for null before using the new attributes, which is very error-prone
- Create two new exception classes `RecordDeserializationException` and `InoperativeRecordException`
- Kafka's exception hierarchy is currently not ideal and it's best not to expose too much public exceptions as this limits what we can change in the internals.
- The current proposal is easier to implement from a user's perspective
- Create new `DeserializationException` which does not extend `SerializationException`.
- Will be backwards-incompatible
...
Future Work
It would be useful to seek past such failed messages automatically. It is worth investigating the idea of adding a configurable default callback or config option to enable users to do that without having to manually implement error handling code.
...