Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors Satish Duggana, Sriharsha Chintalapani, Satish DugganaYing Zheng, Suresh Srinivas, Ying Zheng (alphabetical order by the last names)

Table of Contents

Status

Current State: "Accepted"

...

In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. The local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. The new remote tier uses systems, such as HDFS or S3 to store the completed log segments. Two separate retention periods are defined corresponding to each of the tiers. With remote tier enabled, the retention period for the local tier can be significantly reduced from days to few hours. The retention period for remote tier can be much longer, days, or even months. When a log segment is rolled on the local tier, it is copied to the remote tier along with the corresponding offset indexindexes. Latency sensitive applications perform tail reads and are served from local tier leveraging the existing Kafka mechanism of efficiently using page cache to serve the data. Backfill and other applications recovering from a failure that needs data older than what is in the local tier are served from the remote tier.

...

It does not support compact topics with tiered storage. Topic created with the effective value for remote.log.storage.enable as true, can not change its retention policy from delete to compact.

...

RLM creates tasks for each leader or follower topic partition, which are explained in detail here.

  • RLM Leader Task
    • It checks for rolled over LogSegments (which have the last message offset less than last stable offset of that topic partition) and copies them along with their offset/time/transaction/producer-snapshot indexes and leader epoch cache to the remote tier. It also serves the fetch requests for older data from the remote tier. Local logs are not cleaned up till those segments are copied successfully to remote even though their retention time/size is reached.

...

1) Retrieve the Earliest Local Offset (ELO) and the corresponding leader epoch (ELO-LE) from the leader with a ListOffset request (timestamp = -34)

2) Truncate local log and local auxiliary state

...

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager. More details are explained in the RLM/RSM tasks section. If the remote storage is not available then it will throw a new error TIERED_STORAGE_NOT_AVAILABLE.

...

This API is enhanced with supporting new target timestamp value as -3 which 4 which is called EARLIEST_LOCAL_TIMESTAMP. There will not be any new fields added in request and response schemes but there will be a version bump to indicate the version update. This request is about the offset that the followers should start fetching to replicate the local logs. It represents the log-start-offset available in the local log storage which is also called as local-log-start-offset. All the records earlier to this offset can be considered as copied to the remote storage. This is used by follower replicas to avoid fetching records that are already copied to remote tier storage.

...

When a partition is deleted, the controller updates its state in RLMM with DELETE_PARTITION_MARKED and it expects RLMM will have a mechanism to clean up the remote log segments. This process for default RLMM is described in detail here

RemoteLogMetadataManager implemented with an internal topic

...

remote.log.metadata.topic.replication.factor


Replication factor of the topic

Default: 3

remote.log.metadata.topic.num.partitions

No of partitions of the topic

Default: 50

remote.log.metadata.topic.retention.ms

Retention of the topic in milli seconds. 

Default: -1, that means unlimited. 

Users can configure this value based on their usecases. To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with tiered storage in the cluster. 

remote.log.metadata.manager.listener.name

Listener name to be used to connect to the local broker by RemoteLogMetadataManager implementation on the broker. This is a mandatory config while using the default RLMM implementation which is `org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager`. Respective endpoint address is passed with  "bootstrap.servers" property while invoking RemoteLogMetadataManager#configureinvoking RemoteLogMetadataManager#configure(Map<String, ?> props). 

This is used by kafka clients created in RemoteLogMetadataManager implementation.

remote.log.metadata.*

Default RLMM implementation creates producer and consumer instances. Common client propoerties properties can be configured with `remote.log.metadata.common.client.` prefix.  User can also pass properties specific to producer/consumer with `remote.log.metadata.producer.` and `remote.log.metadata.consumer.` prefixes. These will override properties with `remote.log.metadata.common.client.` prefix.

Any other properties should be prefixed with the config: "remote.log.metadata." and these .manager.impl.prefix", default value is "rlmm.config.". These configs will be passed to RemoteLogMetadataManager#configure(Map<String, ?> props).

For ex: Security configuration to connect to the local broker for the listener name configured are passed with propsexample: "rlmm.config.remote.log.metadata.producer.batch.size=100" will set the batch.size  config for the producer inside default RLMM.

remote.partition.remover.task.interval.msThe interval at which remote partition remover runs to delete the remote storage of the partitions marked for deletion.
Default value: 3600000 (1 hr )

...

System-Wide

remote.log.storage.system.enable - Whether to enable tier storage functionality in a broker or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility. When it is true broker starts all the services required for tiered storage. 

remote.log.storage.manager.class.name - This is mandatory if the remote.log.storage.system.enable is set as true.

remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt metadata manager backed by an internal topic.

RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

remote.log.storage.*

RemoteLogMetadataManager

(These configs are dependent on remote log metadata manager implementation)

remote.log.metadata.*

Remote log manager related configuration.

remote.log.index.file.cache.total.size.mb
The total size of the space allocated to store index files fetched from remote storage in the local storage.
Default value: 1024

remote.log.manager.thread.pool.size
Remote log thread pool size, which is used in scheduling tasks to copy segments, and clean up remote log segments.
Default value: 4

remote.log.manager.task.interval.ms
The interval at which the remote log manager runs the scheduled tasks like copy segments, and clean up remote log segments.
Default value: 30,000

Remote log manager tasks are retried with the exponential backoff algorithm mentioned here.

remote.log.manager.task.retry.backoff.ms
The amount of time in milliseconds to wait before attempting the initial retry of a failed remote storage request.
Default value: 500

remote.log.manager.task.retry.backoff.max.ms
The maximum amount of time in milliseconds to wait before attempting to retry a failed remote storage request.
Default value: 30,000

remote.log.manager.task.retry.jitter
Random jitter amount applied to the `remote.log.manager.task.retry.backoff.ms` for computing the resultant backoff interval. This will avoid reconnection storms.
Default value: 0.2

remote.log.reader.threads
Remote log reader thread pool size, which is used in scheduling tasks to fetch data from remote storage.  
Default value: 5

remote.log.reader.max.pending.tasks
Maximum remote log reader thread pool task queue size. If the task queue is full, broker will stop reading remote log segments.
Default value: 100

Per Topic Configuration

Users can set the desired config for remote.log.storage.enable property for a topic, the default value is false. To enable tier storage for a topic, set remote.logset remote.storage.enable as true. You can not disable this config once it is enabled. We will provide this feature in future versions.

Below retention configs are similar to the log retention. This configuration is used to determine how long the log segments are to be retained in the local storage. Existing log. retention.* are retention configs for the topic partition which includes both local and remote storage. 

local.log.retention.ms
The number of milli seconds to keep the local log segment before it gets deleted.  If If not set, the value in `log.retention.ms` is used. The effective value should always be less than or equal to log. retention.bytes ms value.

local.log.retention.bytes
The maximum size of local log segments that can grow for a partition before it deletes the old segments. If not set, the value in `log.retention.bytes` is used. The effective value should always be less than or equal to log. retention.bytes value.

Remote Storage Manager

`RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is in line with the approach taken for Kafka connectors.

Copying and Deleting APIs are expected to be idempotent, so plugin implementations can retry safely and overwrite any partially copied content, or not failing when content is already deleted.


Code Block
languagejava
titleRemoteStorageManager
package org.apache.kafka.server.log.remote.storage;
...
/**
 * RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
 * storage.
 * <p>
 * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
 * which is universally unique even for the same topic partition and offsets.
 * <p>
 * RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on
 * RemoteStorageManager with the respective {@link RemoteLogSegmentState}. {@link RemoteLogMetadataManager} is
 * responsible for storing and fetching metadata about the remote log segments in a strongly consistent manner.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 * <p>
 * All these APIs are still evolving.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {


    /**
     * Type of the index file.
     */
    enum IndexType {
        /**
         * Represents offset index.
         */
        Offset,

        /**
         * Represents timestamp index.
         */
        Timestamp,

        /**
         * Represents producer snapshot index.
         */
        ProducerSnapshot,

        /**
         * Represents transaction index.
         */
        Transaction,

        /**
         * Represents leader epoch index.
         */
        LeaderEpoch,
    }


      / 

    /**
     * Copies the given {@link LogSegmentData} provided for the given {@param@code remoteLogSegmentMetadata}. This includes
     * log segment and its auxiliary indexes like offset index, time index, transaction index, leader epoch index, and
     * producer snapshot index.
     * <p>
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()#id()}
     * even when it retries to invoke this method for the same log segment data.
     * <p>
     * This operation is expected to be idempotent. If a copy operation is retried and there is existing content already written,
     * it should be overwritten, and do not throw {@link RemoteStorageException}
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param logSegmentData           data to be copied to tiered storage.
     * @throws RemoteStorageException if there are any errors in storing the data of the segment.
     */
    void copyLogSegmentcopyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
 LogSegmentData logSegmentData)
            throws RemoteStorageException;

    /**
     * Returns the remote log LogSegmentData logSegmentData)
            throws RemoteStorageException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given {@link RemoteLogSegmentMetadata starting}
     * starting from the given startPosition. The stream will end at the end of the remote log segment data file/object.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param startPosition            start position of log segment to be read, inclusive.
     * @return input stream of the requested log segment data.
     * @throws RemoteStorageException          if there are any errors while fetching the desired segment.
     * @throws RemoteResourceNotFoundException the requested log segment is not found in the remote storage.
     */
    InputStream fetchLogSegmentDatafetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    int startPosition) throws RemoteStorageException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given {@link RemoteLogSegmentMetadata starting}
     * starting from the given startPosition. The stream will end at the smaller of endPosition and the end of the
 remote log
   * remote *log segment data file/object.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param startPosition            start position of log segment to be read, inclusive.
     * @param endPosition              end position of log segment to be read, inclusive.
     * @return input stream of the requested log segment data.
     * @throws RemoteStorageException          if there are any errors while fetching the desired segment.
     * @throws RemoteResourceNotFoundException the requested log segment is not found in the remote storage.
     */
    InputStream fetchLogSegmentDatafetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                int startPosition,
   int startPosition, int endPosition) throws RemoteStorageException;

    /**
     * Returns the index for the respective log segment of {      int endPosition) throws RemoteStorageException;

    /**
     * Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     * <p>
     * If the index is not present (e.g. Transaction index may not exist because segments create prior to
     * version 2.8.0 will not have transaction index associated with them.),
     * throws {@link RemoteResourceNotFoundException}
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param indexType                type of the index to be fetched for the segment.
     * @return input stream of the requested index.
     * @throws RemoteStorageException          if there are any errors while fetching the index.
     */
 @throws RemoteResourceNotFoundException the InputStreamrequested fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata, IndexType index is not found in the remote storage.
     * The caller of this function are encouraged to re-create the indexes from the segment
     * as the suggested way of handling this error.
     */
    InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                           IndexType indexType) throws RemoteStorageException;

    /**
     * Deletes the resources associated with the given {@param@code remoteLogSegmentMetadata}. Deletion is considered as
     * successful if this call returns successfully without any errors. It will throw {@link RemoteStorageException} if
     * there are any errors in deleting the file.
     * <p>
     * {@linkThis RemoteResourceNotFoundException}operation is thrown when there are no resources associated with the given expected to be idempotent. If resources are not found, it is not expected to
     * throw {@param@link remoteLogSegmentMetadata}RemoteResourceNotFoundException} as it may be already removed from a previous attempt.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted.
     * @throws RemoteStorageException RemoteResourceNotFoundException if the requested resource is not found
     * @throws RemoteStorageException          if there are any storage related errors occurred.
     */
    void deleteLogSegmentdeleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

  }


package org.apache.kafka.common;
...
public class TopicIdPartition {

    private final UUID topicId;
    private final TopicPartition topicPartition;

    public TopicIdPartition(UUID topicId, TopicPartition topicPartition) {
        Objects.requireNonNull(topicId, "topicId can not be null");
        Objects.requireNonNull(topicPartition, "topicPartition can not be null");
        this.topicId = topicId;
        this.topicPartition = topicPartition;
    }

    public UUID topicId() {
        return topicId;
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

...
}


package org.apache.kafka.server.log.remote.storage;
...
/**
 * This represents a universally unique identifier associated to a topic partition's log segment. This will be
 * regenerated for every attempt of copying a specific log segment in {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentMetadata, LogSegmentData)}.
 */
public class RemoteLogSegmentId implements Comparable<RemoteLogSegmentId>, Serializable {
    private static final long serialVersionUID = 1L;

    private final TopicIdPartition topicIdPartition;
    private final UUID id;

    public RemoteLogSegmentId(TopicIdPartition topicIdPartition, UUID id) {
        this.topicIdPartition = requireNonNull(topicIdPartition);
        this.id = requireNonNull(id);
    }

    /**
     * Returns TopicIdPartition of this remote log segment.
     * 
     * @return
     */
    public TopicIdPartition topicIdPartition() {
        return topicIdPartition;
    }

    /**
     * Returns Universally Unique Id of this remote log segment.
     *
     * @return
     */
    public UUID id() {
        return id;
    }
...
}

package org.apache.kafka.server.log.remote.storage;
...
/**
 * It describes the metadata about the log segment in the remote storage.
 */
public class RemoteLogSegmentMetadata implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Universally unique remote log segment id.
     */
    private final RemoteLogSegmentId remoteLogSegmentId;

    /**
     * Start offset of this segment.
     */
    private final long startOffset;

    /**
     * End offset of this segment.
     */
    private final long endOffset;

    /**
     * Leader epoch of the broker.
     */
    private final int leaderEpoch;

    /**
     * Maximum timestamp in the segment
     */
    private final long maxTimestamp;

    /**
     * Epoch time at which the respective {@link #state} is set.
     */
    private final long eventTimestamp;

    /**
     * LeaderEpoch vs offset for messages with in this segment.
     */
    private final Map<Int, Long> segmentLeaderEpochs;

    /**
     * Size of the segment in bytes.
     */
    private final int segmentSizeInBytes;

    /**
     * It indicates the state in which the action is executed on this segment.
     */
    private final RemoteLogSegmentState state;

    /**
     * @param remoteLogSegmentId  Universally unique remote log segment id.
     * @param startOffset         Start offset of this segment.
     * @param endOffset           End offset of this segment.
     * @param maxTimestamp        Maximum timestamp in this segment
     * @param leaderEpoch         Leader epoch of the broker.
     * @param eventTimestamp      Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param segmentSizeInBytes  Size of this segment in bytes.
     * @param state               State of the respective segment of remoteLogSegmentId.
     * @param segmentLeaderEpochs leader epochs occurred with in this segment
     */
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
                                    long maxTimestamp, int leaderEpoch, long eventTimestamp,
                                    int segmentSizeInBytes, RemoteLogSegmentState state, Map<Int, Long> segmentLeaderEpochs) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.eventTimestamp = eventTimestamp;
        this.segmentLeaderEpochs = segmentLeaderEpochs;
        this.state = state;
        this.segmentSizeInBytes = segmentSizeInBytes;
    }

    /**
     * @return unique id of this segment.
     */
    public RemoteLogSegmentId remoteLogSegmentId() {
        return remoteLogSegmentId;
    }

    /**
     * @return Start offset of this segment(inclusive).
     */
    public long startOffset() {
        return startOffset;
    }

    /**
     * @return End offset of this segment(inclusive).
     */
    public long endOffset() {
        return endOffset;
    }

    /**
     * @return Leader or controller epoch of the broker from where this event occurred.
     */
    public int brokerEpoch() {
        return brokerEpoch;
    }

    /**
     * @return Epoch time at which this evcent is occurred.
     */
    public long eventTimestamp() {
        return eventTimestamp;
    }

    /**
     * @return
     */
    public int segmentSizeInBytes() {
        return segmentSizeInBytes;
    }

    public RemoteLogSegmentState state() {
        return state;
    }

    public long maxTimestamp() {
        return maxTimestamp;
    }

    public Map<Int, Long> segmentLeaderEpochs() {
        return segmentLeaderEpochs;
    }

...
}

package org.apache.kafka.server.log.remote.storage;
...
public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File producerIdSnapshotIndex;
    private final ByteBuffer leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
                          ByteBuffer leaderEpochIndex) {
        this.logSegment = logSegment;
        this.offsetIndex = offsetIndex;
        this.timeIndex = timeIndex;
        this.txnIndex = txnIndex;
        this.producerIdSnapshotIndex = producerIdSnapshotIndex;
        this.leaderEpochIndex = leaderEpochIndex;
    }

    public File logSegment() {
        return logSegment;
    }

    public File offsetIndex() {
        return offsetIndex;
    }

    public File timeIndex() {
        return timeIndex;
    }

    public File txnIndex() {
        return txnIndex;
    }

    public File producerIdSnapshotIndex() {
        return producerIdSnapshotIndex;
    }

    public ByteBuffer leaderEpochIndex() {
        return leaderEpochIndex;
    }
...
}

...

Code Block
languagejava
titleRemoteLogMetadataManager
package org.apache.kafka.server.log.remote.storage;
...
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 * <p>
 * This class can be plugged in to Kafka cluster by adding the implementation class as
 * <code>remote.log.metadata.manager.class.name</code> property value. There is an inbuilt implementation backed by
 * topic storage in the local cluster. This is used as the default implementation if
 * remote.log.metadata.manager.class.name is not configured.
 * </p>
 * <p>
 * <code>remote.log.metadata.manager.class.path</code> property is about the class path of the RemoteLogStorageManager
 * implementation. If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded
 * by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this
 * parameter is same with the standard Java class path string.
 * </p>
 * <p>
 * <code>remote.log.metadata.manager.listener.name</code> property is about listener name of the local broker to which
 * it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other
 * required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener.
 * </p>
 * "cluster.id", "broker.id" and all theother properties prefixed with "remote.log.metadata." are passed when
 * {@link #configure(Map)} is invoked on this instance.
 * <p>
 * <p>
 * <p>
 * All these APIs are still evolving.
 * <p>
 */
@InterfaceStability.UnstableEvolving
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Stores Asynchronously adds {@link RemoteLogSegmentMetadata} with the containing {@link RemoteLogSegmentId} into {@link RemoteLogMetadataManager}.
     * <p>
     * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId and it should have the initial state which is {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}.
     * <p>
     * {@link #updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate)} should be used to update an existing RemoteLogSegmentMetadata.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted.
     * @throws RemoteStorageException   if there are any storage related errors occurred.
     */
 @throws IllegalArgumentException if voidthe putRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * RemoteLogSegmentMetadata is updatedgiven metadata instance does not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
     * @return a Future which will complete once this operation is finished.
     */
    Future<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * This method is used to update the {@link RemoteLogSegmentMetadata} asynchronously. Currently, it allows to update with the new
     * state based on the life cycle of the segment. It can go through
     * the below state transitions.
     * <p>
     * <pre>
     * +---------------------+            +----------------------+
     * |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT _FINISHED |
     * +-------------------+-+            +--+-------------------+
     *                     |                 |
     *                     |                 |
     *                     v                 v
     *                  +--+-----------------+-+
     *                  |DELETE_SEGMENT_STARTED|
     *                  +-----------+----------+
     *                              |
     *                              |
     *                              v
     *                  +-----------+-----------+
     *                  |DELETE _SEGMENT _FINISHED|
     *                  +-----------------------+
     * </pre>
     * <p>
     * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED} - This state indicates that the segment copying to remote storage is started but not yet finished.
     * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} - This state indicates that the segment copying to remote storage is finished.
     * <br>
     * The leader broker copies the log segments to the remote storage and puts the remote log segment metadata with the
     * state as “COPY_SEGMENT_STARTED” and updates the state as “COPY_SEGMENT_FINISHED” once the remote log segment metadata with the copy is successful.
     * <p></p>
     * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED} - This state indicates that the segment deletion is started but not yet finished.
     * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED} - This state indicates that the segment is deleted successfully.
     * <br>
     * Leader partitions publish both the above delete segment events when remote log retention is reached for the
     * respective segments. Remote Partition Removers also publish these events when a segment is deleted as part of
     * the remote partition deletion.
     *
     * @param remoteLogSegmentMetadataUpdate update of the remote log segment metadata.
     * @throws RemoteStorageException          if there are any storage related errors occurred.
     * @throws RemoteResourceNotFoundException when there are no resources associated with the given remoteLogSegmentMetadataUpdate.
     * @throws IllegalArgumentException        if the given metadata instance has the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
     * @return a Future which will complete once this operation is finished.
     */
    Future<Void> * state as “COPY_SEGMENT_STARTED” and updates the state as “COPY_SEGMENT_FINISHED” once the copy is successful.updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate)
            throws RemoteStorageException;

     * <p></p>/**
     * Returns {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTEDRemoteLogSegmentMetadata} -if Thisit stateexists indicates thatfor the segmentgiven deletiontopic ispartition startedcontaining butthe not yet finished.offset with
     * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED} - This state indicates that the segment is deleted successfullythe given leader-epoch for the offset, else returns {@link Optional#empty()}.
     *  <br>
     * @param LeadertopicIdPartition partitionstopic publishpartition
 both the above delete segment* events@param when remoteepochForOffset log retention isleader reachedepoch for the given respectiveoffset
 segments. Remote Partition Removers also* publish@param theseoffset events when a segment is deleted.
     *offset
     * @return @paramthe remoteLogSegmentMetadataUpdaterequested updateremote oflog thesegment remotemetadata logif segmentit metadataexists.
     * @throws RemoteStorageException if there are any storage related errors occurred.
     */
    voidOptional<RemoteLogSegmentMetadata> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException;

remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
            /**
     * Fetches RemoteLogSegmentMetadata if it exists for the given topic partition containing offset and leader-epoch for the offset, 
     * else returns {@link Optional#empty()}.
     *
     * @param topicIdPartition topic partition
     * @paramint offsetepochForOffset,
         offset
     * @param epochForOffset leader epoch for the given offset
     * @return the requested remote log segment metadata if it exists.
     * @throws RemoteStorageException if there are any storage related errors occurred.
     */
    Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, long offset, int epochForOffset)
            throws RemoteStorageException;

    /**
     * Returns the highest log offset of topic partition for the given leader epoch in remote storage. This is used by
     * remote log management subsystem to know up uptoto which offset the segments have been copied to remote storage for
     * a given leader epoch.
     *
     * @param topicIdPartition topic partition
     * @param leaderEpoch      leader epoch
     * @return the requested highest log offset if exists.
     * @throws RemoteStorageException if there are any storage related errors occurred.
 related errors occurred.
     */
    Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
                                  */
    Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException;

    /**
     * Update the delete partition state This method is used to update the metadata about remote partition delete event asynchronously. Currently, it allows updating the
     * state ({@link RemotePartitionDeleteState}) of a topic partition in remote metadata storage. Controller invokes
 this method with
  * this method *with {@link RemotePartitionDeleteMetadata} having state as {@link RemotePartitionDeleteState#DELETE_PARTITION_MARKED}.
     * So, remote partition removers
     * can act on this event to clean the respective remote log segments of the partition.
     * <p><br>
     * In the Incasecase of default RLMM implementation, remote partition remover processes {@link RemotePartitionDeleteState#DELETE_PARTITION_MARKED}
     * <ul>
     * <li> sends an event with state as {@link RemotePartitionDeleteState#DELETE_PARTITION_STARTED}
     * <li> gets all the remote log segments and deletes them.
     * <li> sends an event with state as {@link RemotePartitionDeleteState#DELETE_PARTITION_FINISHED} once all the remote log segments are
     * deleted.
     * </ul>
     *
     * @param remotePartitionDeleteMetadata update on delete state of a partition.
     * @throws RemoteStorageException          if there are any storage related errors occurred.
     * @throws RemoteResourceNotFoundException when there are no resources associated with the given remotePartitionDeleteMetadata.
     * @return a Future which will complete once this operation is finished.
     */
    voidFuture<Void> updateRemotePartitionDeleteMetadataputRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
            throws RemoteStorageException;

    /**
     * Returns Listall the remote log segment metadata of the given topicIdPartition.
     * <p>
     * ThisRemote isPartition usedRemovers whenuses athis topicmethod partitionto isfetch deletedall orthe cleaningsegments upfor segmentsa basedgiven ontopic the retentionpartition, toso fetchthat all thethey
     * remote log segments for the given topic partition and can delete them.
     *
     * @return Iterator of all the remote log segments,segment sortedmetadata byfor baseOffsetthe ingiven ascendingtopic orderpartition.
     */
    default Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) {
        return listRemoteLogSegments(topicIdPartition, 0);
  throws  }RemoteStorageException;

    /**
     * Returns iterator of remote log segment metadata, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in
     * ascending order which contains the given leader epoch. This is used by remote log retention management subsystem
     * to fetch the segment metadata for a given leader epoch.
     *
     * @param topicIdPartition topic partition
     * @param leaderEpoch      leader epoch
     * @return Iterator of remote segments, sorted by start offset in ascending order.
     */
    Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition,
                                leader epoch
     * @return Iterator of remote segments, sorted by baseOffset in ascending order.
     */
    Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, longint leaderEpoch) throws RemoteStorageException;

    /**
     * This method is invoked only when there are changes in leadership of the topic partitions that this broker is
     * responsible for.
     *
     * @param leaderPartitions   partitions that have become leaders on this broker.
     * @param followerPartitions partitions that have become followers on this broker.
     */
    void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
                                      Set<TopicIdPartition> followerPartitions);

    /**
     * This method is invoked only when the given topic partitions are stopped on this broker. This can happen when a
     * partition is emigrated to other broker or a partition is deleted.
     *
     * @param partitions topic partitions whichthat have been stopped.
     */
    void onStopPartitions(Set<TopicIdPartition> partitions);
}

package org.apache.kafka.server.log.remote.storage;
...
/**
 * It describes the metadata about the log segment in the remote storage.
 */
public class RemoteLogSegmentMetadataUpdate implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Universally unique remote log segment id.
     */
    private final RemoteLogSegmentId remoteLogSegmentId;

    /**
     * Epoch time at which the respective {@link #state} is set.
     */
    private final long eventTimestamp;

    /**
     * Leader epoch of the broker from where this event occurred.
     */
    private final int leaderEpoch;

    /**
     * It indicates the state in which the action is executed on this segment.
     */
    private final RemoteLogSegmentState state;

    /**
     * @param remoteLogSegmentId  Universally unique remote log segment id.
     * @param eventTimestamp      Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param leaderEpoch         Leader epoch of the broker from where this event occurred.
     * @param state               state of the remote log segment.
     */
    public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId remoteLogSegmentId,
                                          long eventTimestamp,
                                          int leaderEpoch,
                                          RemoteLogSegmentState state) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.eventTimestamp = eventTimestamp;
        this.leaderEpoch = leaderEpoch;
        this.state = state;
    }

    public RemoteLogSegmentId remoteLogSegmentId() {
        return remoteLogSegmentId;
    }

    public long createdTimestamp() {
        return eventTimestamp;
    }

    public RemoteLogSegmentState state() {
        return state;
    }
    
    public int leaderEpoch() {
        return leaderEpoch;
    }
...
}

package org.apache.kafka.server.log.remote.storage;
...
public class RemotePartitionDeleteMetadata {

    private final TopicIdPartition topicPartition;
    private final RemotePartitionDeleteState state;
    private final long eventTimestamp;
    private final int epoch;

    public RemotePartitionDeleteMetadata(TopicIdPartition topicPartition, RemotePartitionDeleteState state, long eventTimestamp, int epoch) {
        Objects.requireNonNull(topicPartition);
        Objects.requireNonNull(state);
        if(state != RemotePartitionDeleteState.DELETE_PARTITION_MARKED && state != RemotePartitionDeleteState.DELETE_PARTITION_STARTED
                && state != RemotePartitionDeleteState.DELETE_PARTITION_FINISHED) {
            throw new IllegalArgumentException("state should be one of the delete partition states");
        }
        this.topicPartition = topicPartition;
        this.state = state;
        this.eventTimestamp = eventTimestamp;
        this.epoch = epoch;
    }

    public TopicIdPartition topicPartition() {
        return topicPartition;
    }

    public RemotePartitionDeleteState state() {
        return state;
    }

    public long eventTimestamp() {
        return eventTimestamp;
    }

    public int epoch() {
        return epoch;
    }

...
}


package org.apache.kafka.server.log.remote.storage;
...
/**
 * It indicates the deletion state of the remote topic partition. This will be based on the action executed on this
 * partition by the remote log service implementation.
 * <p>
 */
public enum RemotePartitionDeleteState {

    /**
     * This is used when a topic/partition is deleted by controller.
     * This partition is marked for delete by controller. That means, all its remote log segments are eligible for
     * deletion so that remote partition removers can start deleting them.
     */
    DELETE_PARTITION_MARKED((byte) 0),

    /**
     * This state indicates that the partition deletion is started but not yet finished.
     */
    DELETE_PARTITION_STARTED((byte) 1),

    /**
     * This state indicates that the partition is deleted successfully.
     */
    DELETE_PARTITION_FINISHED((byte) 2);

    private static final Map<Byte, RemotePartitionDeleteState> STATE_TYPES = Collections.unmodifiableMap(
            Arrays.stream(values()).collect(Collectors.toMap(RemotePartitionDeleteState::id, Function.identity())));

    private final byte id;

    RemotePartitionDeleteState(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RemotePartitionDeleteState forId(byte id) {
        return STATE_TYPES.get(id);
    }
...
}

package org.apache.kafka.server.log.remote.storage;
...
/**
 * It indicates the state of the remote log segment. This will be based on the action executed on this
 * segment by the remote log service implementation.
 * <p>
 */
public enum RemoteLogSegmentState {

    /**
     * This state indicates that the segment copying to remote storage is started but not yet finished.
     */
    COPY_SEGMENT_STARTED((byte) 0),

    /**
     * This state indicates that the segment copying to remote storage is finished.
     */
    COPY_SEGMENT_FINISHED((byte) 1),

    /**
     * This state indicates that the segment deletion is started but not yet finished.
     */
    DELETE_SEGMENT_STARTED((byte) 2),

    /**
     * This state indicates that the segment is deleted successfully.
     */
    DELETE_SEGMENT_FINISHED((byte) 3),

    private static final Map<Byte, RemoteLogSegmentState> STATE_TYPES = Collections.unmodifiableMap(
            Arrays.stream(values()).collect(Collectors.toMap(RemoteLogSegmentState::id, Function.identity())));

    private final byte id;

    RemoteLogSegmentState(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RemoteLogSegmentState forId(byte id) {
        return STATE_TYPES.get(id);
    }
...
}


...

The following new metrics will be added:

mbeanMBeandescription
kafka.server:type=BrokerTopicMetrics, name=RemoteReadRequestsPerSec, topic=([-.w]+)Number of remote storage read requests per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteBytesInPerSec, topic=([-.w]+)Number of bytes read from remote storage per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteReadErrorPerSec, topic=([-.w]+)Number of remote storage read errors per second.
kafka.log.remote:type=RemoteStorageThreadPool, name=RemoteLogReaderTaskQueueSizeNumber of remote storage read tasks pending for execution.
kafka.log.remote:type=RemoteStorageThreadPool, name=RemoteLogReaderAvgIdlePercentAverage idle percent of the remote storage reader thread pool.
kafka.log.remote:type=RemoteLogManager, name=RemoteLogManagerTasksAvgIdlePercentAverage idle percent of RemoteLogManager thread pool.

kafka.server:type=BrokerTopicMetrics, name=RemoteBytesOutPerSec, topic=([-.w]+)

Number of bytes copied to remote storage per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteWriteErrorPerSec, topic=([-.w]+)Number of remote storage write errors per second.

Some of these metrics have been updated with new names as part of KIP-930

Upgrade

Follow the steps mentioned in Kafka upgrade to reach the state where all brokers are running on the latest binaries with the respective "inter.broker.protocol" and "log.message.format" versions. Tiered storage requires the message format to be > 0.11.

...

You can enable tiered storage by setting “remote.log.storage.enable” to true on the desired topics. Before enabling tiered storage, you should make sure the producer snapshots are built for all the segments for that topic in all followers. You should wait till the log retention occurs for all the segments so that all the segments have producer snapshots. Because follower replicas for topics with tier storage enabled, need the respective producer snapshot for each segment for reconciling the state as mentioned in the earlier follower fetch protocol section.

...

  • Once tier storage is enabled for a topic, it can not be disabled. We will add this feature in future versions. One possible workaround is to create a new topic and copy the data from the desired offset and delete the old topic. Another possible work around is to set the log.local.retention.ms same as retention.ms and wait until the local retention catches up until complete log retention. This will make the complete data available locally. After that, set remote.storage.enable as false to disable tiered storage on a topic. 
  • Multiple Log dirs on a broker are not supported (JBOD related features).
  • Tiered storage is not supported for compacted topics.

...

Consuming old data has a significant performance impact to acks=all producers. Without tiered storage, the P99 produce latency is almost tripled~1.5 times. With tiered storage, the performance impact is relatively lower, because remote storage reading does not compete with the local hard disk bandwidth with produce requests.

...

  • Discussion Recording
  • Notes
    • KIP is updated with follower fetch protocol and ready to reviewed
    • Satish to capture schema of internal metadata topic in the KIP
    • We will update the KIP with details of different cases
    • Test plan will be captured in a doc and will add to the KIP
    • Add a section "Limitations" to capture the capabilities that will be introduced with this KIP and what will not be covered in this KIP.

Other associated KIPs

KIP-852: Optimize calculation of size for log in remote tier

KIP-917: Additional custom metadata for remote log segment