Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
Vote thread: here
JIRA: KAFKA-5682
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Public Interfaces
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.errors; import org.apache.kafka.common.TopicPartition; /** * LicensedAny exception toencountered theon Apachean Softwareinvalid Foundationrecord (ASF)deserialization undererror, onecorrupt or morerecord, etc...) */ public contributorclass licenseRecordDeserializationException agreements.extends SeeSerializationException the{ NOTICE file distributed with private *static thisfinal worklong forserialVersionUID additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.kafka.common.errors; import org.apache.kafka.common.TopicPartition; /** * Represents an exception which was caused by a faulty record in the log. * It holds information pointing to the specific record. * The user is expected to seek past the offset for the given partition ({@code {@link UnconsumableRecordException#offset()} + 1}). */ public interface UnconsumableRecordException { /** * @return the partition of the faulty record */ TopicPartition partition(); /** * @return the offset of the faulty record */ long offset(); } |
Proposed Changes
= 1L;
private TopicPartition partition;
private long offset;
public RecordDeserializationException(TopicPartition partition, long offset, String message) {
this(partition, offset, message, null);
}
public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) {
super(message, cause);
this.partition = partition;
this.offset = offset;
}
public TopicPartition partition() {
return partition;
}
public long offset() {
return offset;
}
/* avoid the expensive and useless stack trace for deserialization exceptions */
@Override
public Throwable fillInStackTrace() {
return this;
}
}
|
Proposed Changes
Introduce one new public exception - `RecordDeserializationException` which extends `SerializationException` (and indirectly extends `KafkaException`). This Introduce two new internal exception classes (under `org/apache/kafka/common/internals/errors` - `RecordDeserializationException` which extends `SerializationException` and `InoperativeRecordException` which extends `KafkaException`. These new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch them it and access the metadata about the record which caused the exception. They will implement the new interface - `UnconsumableRecordException`
This exception These exceptions will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer API and allowing the user to handle them . The user will only need to check if the raised exception implements the `UnconsumableRecordException` interface and if so, cast it and access the by accessing the partition/offset enabling him to seek past the faulty record.
Example
Users are expected to seek one offset past the faulty one.
Code Block |
---|
while (true) {
try {
consumer.poll(Duration.ofMillis(50));
} catch (RecordDeserializationException e) {
consumer.seek(e.partition(), e.offset() + 1);
}
} |
Compatibility, Deprecation, and Migration Plan
...
- Add `TopicPartition partition` and `long offset` attributes to the existing `SerializationException` and `KafkaException`
- They will still be backwards-compatible, but might easily result in `NullPointerException`, since not every use case (especially for `KafkaException`) has the appropriate offset.
- Users will need to check for null before using the new attributes, which is very error-prone
- Expose the Create two new exception classes `RecordDeserializationException` and `InoperativeRecordException` instead of the interface
- Kafka's exception hierarchy is currently not ideal and it's best not to expose too much public exceptions as this limits what we can change in the internals.
- The current proposal is easier to implement from a user's perspective
- Create new `DeserializationException` which does not extend `SerializationException`.
- Will be backwards-incompatible
...
Future Work
It would be useful to seek past such failed messages automatically. It is worth investigating the idea of adding a configurable default callback or config option to enable users to do that without having to manually implement error handling code.
...