You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:  KAFKA-5682

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Whenever consumers encounter an exception caused by record deserialization or record invalidity (e.g corrupt record), a `SerializationException` or `KafkaException` is raised to the user, with a string message similar to this: "Error deserializing key/value for partition test-0 at offset 10. If needed, please seek past the record to continue consumption."

The problem is that while the topic, partition and offset are reported in the message, they are not directly available in the exception - leaving the user unable to seek past the record, since he does not know which partition is the cause. (unless he parses the string message)

Public Interfaces

Two new exception classes - `RecordDeserializationException` and `InoperativeRecordException` under `org/apache/kafka/common/errors` which get raised instead of `RecordSerializationException` and `InvalidRecordException` respectively.

Proposed Changes

Introduce two new exception classes - `RecordDeserializationException` which extends `SerializationException` and `InoperativeRecordException` which extends `KafkaException`. These new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch them and access the metadata about the record which caused the exception.


package org.apache.kafka.common.errors;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;

/**
 *  Any exception when encountering an corrupted/invalid record in the consumer
 */
public class InoperativeRecordException extends KafkaException {

    private static final long serialVersionUID = 1L;
    private TopicPartition partition;
    private long offset;

    public InoperativeRecordException(TopicPartition partition, long offset, String message) {
        super(message);
        this.partition = partition;
        this.offset = offset;
    }

    public TopicPartition partition() {
        return partition;
    }

    public long offset() {
        return offset;
    }
}
package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;

/**
 *  Any exception during deserialization in the consumer
 */
public class RecordDeserializationException extends SerializationException {

    private static final long serialVersionUID = 1L;
    private TopicPartition partition;
    private long offset;

    public RecordDeserializationException(TopicPartition partition, long offset, String message) {
        super(message);
        this.partition = partition;
        this.offset = offset;
    }

    public TopicPartition partition() {
        return partition;
    }

    public long offset() {
        return offset;
    }

    /* avoid the expensive and useless stack trace for serialization exceptions */
    @Override
    public Throwable fillInStackTrace() {
        return this;
    }
}


These exceptions will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer and allowing the user to handle them.

Compatibility, Deprecation, and Migration Plan

This is a fully backwards-compatible change. Since the new exceptions extend the original exception classes, existing logic which handles the original exceptions will continue handling the new ones.

Rejected Alternatives

  • Add `TopicPartition partition` and `long offset` attributes to the existing `SerializationException` and `KafkaException`
    • They will still be backwards-compatible, but might easily result in `NullPointerException`, since not every use case (especially for `KafkaException`) has the appropriate offset.
    • Users will need to check for null before using the new attributes
  • Create new `DeserializationException` which does not extend `SerializationException`.
    • Will be backwards-incompatible

  • No labels