Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: KAFKA-15876here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

When tiered storage is enabled on the cluster, Kafka brokers broker has to build the remote log metadata for all the partitions that it is either leader/follower on node restart. The remote log metadata is built in asynchronous fashion and does not interfere with the broker startup path. Once the broker becomes leader for some partitionsonline, it cannot handle the client requests (FETCH and LIST_OFFSETS) to access remote storage until the metadata gets built for those partitions. Currently, we are returning a ReplicaNotAvailable exception back to the client so that it will retry after sometime.

ReplicaNotAvailableException is applicable when there is a reassignment is in-progress and kind of deprecated with the NotLeaderOrFollowerException (PR#8979). It's good to introduce an appropriate retriable exception for remote storage errors to denote that it is not ready to accept the client requests yet.

...

Code Block
languagejava
titleRemoteStorageNotReadyException
package org.apache.kafka.common.errors;

/**
 * An exception that indicates remote storage is not ready to receive the requests yet.
 */
public class RemoteStorageNotReadyException extends RetriableException {

    private static final long serialVersionUID = 1L;

    public RemoteStorageNotReadyException(String message, Throwable cause) {
        super(message, cause);
    }

    public RemoteStorageNotReadyException(String message, Throwable cause) {
        super(message, cause);
    }

    public RemoteStorageNotReadyException(Throwable cause) {
        super(cause);
    }

    public RemoteStorageNotReadyException() {
    }
}


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
When the metadata is not ready, instead of returning ReplicaNotAvailableException in RemotePartitionMetadataStore, we will return the new RemoteStorageNotReadyException.

The consumer can read the local data as long as it knows the offset from where to fetch the data from. When there is no initial offset, the consumer decides the offset based on the below config: 

Code Block
auto.offset.reset = earliest / latest / none
  • For `earliest` offset policy and any offset that lies in the remote storage, the consumer (FETCH request) cannot be able to make progress until the remote log metadata gets synced.
  • In a FETCH request, when there are multiple partitions where a subset of them are consuming from local and others from remote, then only the partitions which are consuming from the remote cannot make progress and the partitions that fetch data from local storage should be able to make progress.
  • In a FETCH request, when the fetch-offset for a partition is within the local-storage, then it should be able to consume the messages.
  • All the calls to LIST_OFFETS will fail until the remote log metadata gets synced.

Compatibility, Deprecation, and Migration Plan

...