You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:  KAFKA-5682

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Whenever consumers encounter an exception caused by record deserialization or record invalidity (e.g corrupt record), a `SerializationException` or `KafkaException` is raised to the user, with a string message similar to this: "Error deserializing key/value for partition test-0 at offset 10. If needed, please seek past the record to continue consumption."

The problem is that while the topic, partition and offset are reported in the message, they are not directly available in the exception - leaving the user unable to seek past the record, since he does not know which partition is the cause. (unless he parses the string message)

Public Interfaces

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.common.errors;

import org.apache.kafka.common.TopicPartition;

/**
 * Represents an exception which was caused by a faulty record in the log.
 * It holds information pointing to the specific record.
 * The user is expected to seek past the offset for the given partition ({@code {@link UnconsumableRecordException#offset()} + 1}).
 */
public interface UnconsumableRecordException {
    /**
     * @return the partition of the faulty record
     */
    TopicPartition partition();

    /**
     * @return the offset of the faulty record
     */
    long offset();
}



Proposed Changes

Introduce two new internal exception classes (under `org/apache/kafka/common/internals/errors` - `RecordDeserializationException` which extends `SerializationException` and `InoperativeRecordException` which extends `KafkaException`. These new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch them and access the metadata about the record which caused the exception. They will implement the new interface - `UnconsumableRecordException`

These exceptions will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer and allowing the user to handle them. The user will only need to check if the raised exception implements the `UnconsumableRecordException` interface and if so, cast it and access the partition/offset enabling him to seek past the faulty record.

Compatibility, Deprecation, and Migration Plan

This is a fully backwards-compatible change. Since the new exceptions extend the original exception classes, existing logic which handles the original exceptions will continue handling the new ones.

Rejected Alternatives

  • Add `TopicPartition partition` and `long offset` attributes to the existing `SerializationException` and `KafkaException`
    • They will still be backwards-compatible, but might easily result in `NullPointerException`, since not every use case (especially for `KafkaException`) has the appropriate offset.
    • Users will need to check for null before using the new attributes, which is very error-prone
  • Expose the `RecordDeserializationException` and `InoperativeRecordException` instead of the interface
    • Kafka's exception hierarchy is currently not ideal and it's best not to expose too much public exceptions as this limits what we can change in the internals
  • Create new `DeserializationException` which does not extend `SerializationException`.
    • Will be backwards-incompatible

Follow-up KIP Idea

It would be useful to seek past such failed messages automatically. It is worth investigating the idea of adding a configurable default callback or config option to enable users to do that without having to manually implement error handling code.


  • No labels