Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-5682
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Whenever consumers encounter an exception caused by record deserialization or record invalidity (e.g corrupt record), a `SerializationException` or `KafkaException` is raised to the user, with a string message similar to this: "Error deserializing key/value for partition test-0 at offset 10. If needed, please seek past the record to continue consumption."
The problem is that while the topic, partition and offset are reported in the message, they are not directly available in the exception - leaving the user unable to seek past the record, since he does not know which partition is the cause. (unless he parses the string message)
Public Interfaces
Two new exception classes - `RecordDeserializationException` and `InoperativeRecordException` under `org/apache/kafka/common/errors` which get raised instead of `RecordSerializationException` and `InvalidRecordException` respectively.
Proposed Changes
Introduce two new exception classes - `RecordDeserializationException` which extends `SerializationException` and `InoperativeRecordException` which extends `KafkaException`. These new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch them and access the metadata about the record which caused the exception.
package org.apache.kafka.common.errors; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; /** * Any exception when encountering an corrupted/invalid record in the consumer */ public class InoperativeRecordException extends KafkaException { private static final long serialVersionUID = 1L; private TopicPartition partition; private long offset; public InoperativeRecordException(TopicPartition partition, long offset, String message) { super(message); this.partition = partition; this.offset = offset; } public TopicPartition partition() { return partition; } public long offset() { return offset; } }
package org.apache.kafka.common.errors; import org.apache.kafka.common.TopicPartition; /** * Any exception during deserialization in the consumer */ public class RecordDeserializationException extends SerializationException { private static final long serialVersionUID = 1L; private TopicPartition partition; private long offset; public RecordDeserializationException(TopicPartition partition, long offset, String message) { super(message); this.partition = partition; this.offset = offset; } public TopicPartition partition() { return partition; } public long offset() { return offset; } /* avoid the expensive and useless stack trace for serialization exceptions */ @Override public Throwable fillInStackTrace() { return this; } }
These exceptions will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer and allowing the user to handle them.
Compatibility, Deprecation, and Migration Plan
This is a fully backwards-compatible change. Since the new exceptions extend the original exception classes, existing logic which handles the original exceptions will continue handling the new ones.
Rejected Alternatives
- Add `TopicPartition partition` and `long offset` attributes to the existing `SerializationException` and `KafkaException`
- They will still be backwards-compatible, but might easily result in `NullPointerException`, since not every use case (especially for `KafkaException`) has the appropriate offset.
- Users will need to check for null before using the new attributes
- Create new `DeserializationException` which does not extend `SerializationException`.
- Will be backwards-incompatible