Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Status

Current state: Under DiscussionVote

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Public Interfaces


Code Block
languagejava
package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;

/**
 * Represents an exception which was caused by a faulty record in the log.
 * It holds information pointing to the specific record.
 * The user is expected to seek past the offset for the given partition ({@code {@link UnconsumableRecordException#offset()} + 1}).
 */
public interface UnconsumableRecordException {
    /**
     * @return the partition of the faulty record
     */
    TopicPartition partition();

    /**
     * @return the offset of the faulty record
     */
    long offset();
}


Proposed Changes

 Any exception encountered on an invalid record (deserialization error, corrupt record, etc...)
 */
public class FaultyRecordException extends SerializationException {

    private static final long serialVersionUID = 1L;
    private TopicPartition partition;
    private long offset;

    public FaultyRecordException(TopicPartition partition, long offset, String message) {
        this(partition, offset, message, null);
    }

    public FaultyRecordException(TopicPartition partition, long offset, String message, Throwable cause) {
        super(message, cause);
        this.partition = partition;
        this.offset = offset;
    }

    public TopicPartition partition() {
        return partition;
    }

    public long offset() {
        return offset;
    }

    /* avoid the expensive and useless stack trace for deserialization exceptions */
    @Override
    public Throwable fillInStackTrace() {
        return this;
    }
}


Proposed Changes

Introduce one new public exception - `FaultyRecordException` which extends `SerializationException` (and indirectly extends `KafkaException`). This Introduce two new internal exception classes (under `org/apache/kafka/common/internals/errors` - `RecordDeserializationException` which extends `SerializationException` and `InoperativeRecordException` which extends `KafkaException`. These new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch them it and access the metadata about the record which caused the exception. They will implement the new interface - `UnconsumableRecordException`


This exception These exceptions will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer and allowing the user to handle them . The user will only need to check if the raised exception implements the `UnconsumableRecordException` interface and if so, cast it and access the partition/offset enabling him to seek past the faulty record.

...

  • Add `TopicPartition partition` and `long offset` attributes to the existing `SerializationException` and `KafkaException`
    • They will still be backwards-compatible, but might easily result in `NullPointerException`, since not every use case (especially for `KafkaException`) has the appropriate offset.
    • Users will need to check for null before using the new attributes, which is very error-prone
  • Expose the Create two new exception classes `RecordDeserializationException` and `InoperativeRecordException` instead of the interface
    • Kafka's exception hierarchy is currently not ideal and it's best not to expose too much public exceptions as this limits what we can change in the internals
    • The current proposal is easier to implement from a user's perspective
  • Create new `DeserializationException` which does not extend `SerializationException`.
    • Will be backwards-incompatible

...