Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Introduce two new exception classes - `RecordDeserializationException` which extends `SerializationException` and `InoperativeRecordException` which extends `KafkaException`. These new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch them and access the metadata about the record which caused the exception.


Code Block
package org.apache.kafka.common.errors;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;

/**
 *  Any exception during deserialization in the consumer
 */
public class InoperativeRecordException extends KafkaException {

    private static final long serialVersionUID = 1L;
    private TopicPartition partition;
    private long offset;

    public InoperativeRecordException(TopicPartition partition, long offset, String message, Throwable cause) {
        super(message, cause);
        this.partition = partition;
        this.offset = offset;
    }

    public TopicPartition getPartition() {
        return partition;
    }

    public long getOffset() {
        return offset;
    }
}


Code Block
package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;

/**
 *  Any exception during deserialization in the consumer
 */
public class RecordDeserializationException extends SerializationException {

    private static final long serialVersionUID = 1L;
    private TopicPartition partition;
    private long offset;

    public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) {
        super(message, cause);
        this.partition = partition;
        this.offset = offset;
    }

    public TopicPartition getPartition() {
        return partition;
    }

    public long getOffset() {
        return offset;
    }

    /* avoid the expensive and useless stack trace for serialization exceptions */
    @Override
    public Throwable fillInStackTrace() {
        return this;
    }
}


These exceptions will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer and allowing the user to handle them.

...