Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;

/**
 *  Any exception encountered on an invalid record (deserialization error, corrupt record, etc...)
 */
public class FaultyRecordExceptionRecordDeserializationException extends SerializationException {

    private static final long serialVersionUID = 1L;
    private TopicPartition partition;
    private long offset;

    public FaultyRecordExceptionRecordDeserializationException(TopicPartition partition, long offset, String message) {
        this(partition, offset, message, null);
    }

    public FaultyRecordExceptionRecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) {
        super(message, cause);
        this.partition = partition;
        this.offset = offset;
    }

    public TopicPartition partition() {
        return partition;
    }

    public long offset() {
        return offset;
    }

    /* avoid the expensive and useless stack trace for deserialization exceptions */
    @Override
    public Throwable fillInStackTrace() {
        return this;
    }
}

...

Introduce one new public exception - `FaultyRecordException` `RecordDeserializationException` which extends `SerializationException` (and indirectly extends `KafkaException`). This new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch it and access the metadata about the record which caused the exception.

This exception will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer API and allowing the user to handle them and access by accessing the partition/offset enabling him to seek past the faulty record.

Example

Users are expected to seek one offset past the faulty one.

Code Block
while (true) {
    try {
        consumer.poll(Duration.ofMillis(50));
    } catch (RecordDeserializationException e) {
        consumer.seek(tp, e.offset() + 1);
    }
}


Compatibility, Deprecation, and Migration Plan

...

  • Add `TopicPartition partition` and `long offset` attributes to the existing `SerializationException` and `KafkaException`
    • They will still be backwards-compatible, but might easily result in `NullPointerException`, since not every use case (especially for `KafkaException`) has the appropriate offset.
    • Users will need to check for null before using the new attributes, which is very error-prone
  • Create two new exception classes `RecordDeserializationException` and `InoperativeRecordException`
    • Kafka's exception hierarchy is currently not ideal and it's best not to expose too much public exceptions as this limits what we can change in the internals. 
    • The current proposal is easier to implement from a user's perspective
  • Create new `DeserializationException` which does not extend `SerializationException`.
    • Will be backwards-incompatible

...


Future Work

It would be useful to seek past such failed messages automatically. It is worth investigating the idea of adding a configurable default callback or config option to enable users to do that without having to manually implement error handling code.

...