Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The problem is that while the topic, partition and offset are reported in the message, they are not directly available in the exception - leaving the user unable to seek past the record, since he does not know which partition is the cause. (unless he parses the string message)

Public Interfaces

Two new exception classes - `RecordDeserializationException` and `InoperativeRecordException` under `org/apache/kafka/common/errors` which get raised instead of `RecordSerializationException` and `InvalidRecordException` respectively.

Proposed Changes

...


Code Block
languagejava
/*
Code Block
package org.apache.kafka.common.errors;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;

/**
 * Licensed Anyto exceptionthe whenApache encounteringSoftware an corrupted/invalid record in the consumerFoundation (ASF) under one or more
 */
public classcontributor InoperativeRecordExceptionlicense extendsagreements. KafkaExceptionSee {

the NOTICE file distributed privatewith
 static* finalthis longwork serialVersionUIDfor =additional 1L;
information regarding   private TopicPartition partition;copyright ownership.
 * The ASF privatelicenses long offset;

    public InoperativeRecordException(TopicPartition partition, long offset, String message) {
        super(message);
        this.partition = partition;
        this.offset = offset;
    }

    public TopicPartition partition() {
        return partition;
    }

    public long offset() {
        return offset;
    }
}
Code Block
this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;

/**
 * Represents Anyan exception duringwhich deserializationwas incaused theby consumer
 */
public class RecordDeserializationException extends SerializationException {

    private static final long serialVersionUID = 1L;
    private TopicPartition partition;
    private long offset;

    public RecordDeserializationException(TopicPartition partition, long offset, String message) {
        super(message);a faulty record in the log.
 * It holds information pointing to the specific record.
 * The user is expected to seek past the offset for the given partition ({@code {@link UnconsumableRecordException#offset()} + 1}).
 */
public interface UnconsumableRecordException {
    /**
     * @return the this.partition =of partition;
the faulty record
      this.offset = offset;*/
    }

    public TopicPartition partition() {
        return partition;
    }

    public long offset() {/**
     * @return  returnthe offset;
 of the faulty }record

    /* avoid the expensive and useless stack trace for serialization exceptions */
    @Override
    public Throwable fillInStackTrace() {
        return this;
    }
}

long offset();
}



Proposed Changes

Introduce two new internal exception classes (under `org/apache/kafka/common/internals/errors` - `RecordDeserializationException` which extends `SerializationException` and `InoperativeRecordException` which extends `KafkaException`. These new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch them and access the metadata about the record which caused the exception. They will implement the new interface - `UnconsumableRecordException`

These exceptions will These exceptions will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer and allowing the user to handle them. The user will only need to check if the raised exception implements the `UnconsumableRecordException` interface and if so, cast it and access the partition/offset enabling him to seek past the faulty record.

Compatibility, Deprecation, and Migration Plan

...

  • Add `TopicPartition partition` and `long offset` attributes to the existing `SerializationException` and `KafkaException`
    • They will still be backwards-compatible, but might easily result in `NullPointerException`, since not every use case (especially for `KafkaException`) has the appropriate offset.
    • Users will need to check for null before using the new attributes, which is very error-prone
  • Expose the `RecordDeserializationException` and `InoperativeRecordException` instead of the interface
    • Kafka's exception hierarchy is currently not ideal and it's best not to expose too much public exceptions as this limits what we can change in the internals
  • Create new `DeserializationException` which does not extend `SerializationException`.
    • Will be backwards-incompatible

Follow-up KIP Idea

It would be useful to seek past such failed messages automatically. It is worth investigating the idea of adding a configurable default callback or config option to enable users to do that without having to manually implement error handling code.