`RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is in line with the approach taken for Kafka connectors.

Copying and Deleting APIs are expected to be idempotent, so plugin implementations can retry safely and overwrite any partially copied content, or not failing when content is already deleted.

Code Block
 * RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
 * storage.
 * <p>
 * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
 * which is universally unique even for the same topic partition and offsets.
 * <p>
 * RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on
 * RemoteStorageManager with the respective {@link RemoteLogSegmentState}. {@link RemoteLogMetadataManager} is
 * responsible for storing and fetching metadata about the remote log segments in a strongly consistent manner.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 * <p>
 * All these APIs are still evolving.
public interface RemoteStorageManager extends Configurable, Closeable {

     * Type of the index file.
    enum IndexType {
         * Represents offset index.

         * Represents timestamp index.

         * Represents producer snapshot index.

         * Represents transaction index.

         * Represents leader epoch index.

     * Copies the given {@link LogSegmentData} provided for the given {@param@code remoteLogSegmentMetadata}. This includes
     * log segment and its auxiliary indexes like offset index, time index, transaction index, leader epoch index, and
     * producer snapshot index.
     * <p>
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()#id()}
     * even when it retries to invoke this method for the same log segment data.
     * <p>
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param logSegmentData     data to be copied to tiered storage.
     
 * @throws RemoteStorageException if there are any errors in storing the remote log segment.
     * @param logSegmentData           data to be copied to tiered storage.
     * @throws RemoteStorageException if there are any errors in storing the data of the segment.
    void copyLogSegmentcopyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                            LogSegmentData logSegmentData)
            throws RemoteStorageException;

     * Returns the remote log segment data file/object as InputStream for the given {@link RemoteLogSegmentMetadata starting}
     * starting from the given startPosition. The stream will end at the end of the remote log segment data file/object.
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param startPosition            start position of log segment to be read, inclusive.
     * @return input stream of the requested log segment data.
     * @throws RemoteStorageException          if there are any errors while fetching the desired segment.
     * @throws RemoteResourceNotFoundException the requested log segment is not found in the remote storage.
    InputStream fetchLogSegmentDatafetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    int startPosition) throws RemoteStorageException;

     * Returns the remote log segment data file/object as InputStream for the given {@link RemoteLogSegmentMetadata starting}
     * starting from the given startPosition. The stream will end at the smaller of endPosition and the end of the
 remote log
   * remote log segment data file/object.
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param startPosition            start position of log segment to be read, inclusive.
     * @param endPosition              end position of log segment to be read, inclusive.
     * @return input stream of the requested log segment data.
     * @throws RemoteStorageException          if there are any errors while fetching the desired segment.
 * @throws RemoteResourceNotFoundException the requested log segment is not found in the remote storage.
    InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                int startPosition,
                                int endPosition) throws RemoteStorageException;

     * Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     * <p>
     * If the index is not present (e.g. Transaction index may not exist because segments create prior to
     * version 2.8.0 will not have transaction index associated with them.),
     * throws {@link RemoteResourceNotFoundException}
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param indexType                type of the index to be fetched for the segment.
     * @return input stream of the requested index.
     * @throws RemoteStorageException          if there are any errors while fetching the index.
 * @throws RemoteResourceNotFoundException the requested index is not found in the remote storage.
     * The caller of this function are encouraged to re-create the indexes from the segment.
     * as the suggested way of handling this error.
     * @throws RemoteStorageException if there are any errors while fetching the index.
    InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
    InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata, IndexType indexType) throws RemoteStorageException;

     * Deletes the resources associated with the given {@param@code remoteLogSegmentMetadata}. Deletion is considered as
     * successful if this call returns successfully without any errors. It will throw {@link RemoteStorageException} if
     * there are any errors in deleting the file.
     * <p>
     * This operation is expected to be idempotent. If resources are not found, it is not expected to
     * throw {@link RemoteResourceNotFoundException} as it may be already removed from a previous attempt.
     * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted.
     
     * @throws RemoteStorageException          if there are any storage related errors occurred.
    void deleteLogSegmentdeleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;


package org.apache.kafka.common;
public class TopicIdPartition {

    private final UUID topicId;
    private final TopicPartition topicPartition;

    public TopicIdPartition(UUID topicId, TopicPartition topicPartition) {
        Objects.requireNonNull(topicId, "topicId can not be null");
        Objects.requireNonNull(topicPartition, "topicPartition can not be null");
        this.topicId = topicId;
        this.topicPartition = topicPartition;

    public UUID topicId() {
        return topicId;

    public TopicPartition topicPartition() {
        return topicPartition;


 * This represents a universally unique identifier associated to a topic partition's log segment. This will be
 * regenerated for every attempt of copying a specific log segment in {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentMetadata, LogSegmentData)}.
public class RemoteLogSegmentId implements Comparable<RemoteLogSegmentId>, Serializable {
    private static final long serialVersionUID = 1L;

    private final TopicIdPartition topicIdPartition;
    private final UUID id;

    public RemoteLogSegmentId(TopicIdPartition topicIdPartition, UUID id) {
        this.topicIdPartition = requireNonNull(topicIdPartition); = requireNonNull(id);

     * Returns TopicIdPartition of this remote log segment.
     * @return
    public TopicIdPartition topicIdPartition() {
        return topicIdPartition;

     * Returns Universally Unique Id of this remote log segment.
     * @return
    public UUID id() {
        return id;

 * It describes the metadata about the log segment in the remote storage.
public class RemoteLogSegmentMetadata implements Serializable {

    private static final long serialVersionUID = 1L;

     * Universally unique remote log segment id.
    private final RemoteLogSegmentId remoteLogSegmentId;

     * Start offset of this segment.
    private final long startOffset;

     * End offset of this segment.
    private final long endOffset;

     * Leader epoch of the broker.
    private final int leaderEpoch;

     * Maximum timestamp in the segment
    private final long maxTimestamp;

     * Epoch time at which the respective {@link #state} is set.
    private final long eventTimestamp;

     * LeaderEpoch vs offset for messages with in this segment.
    private final Map<Int, Long> segmentLeaderEpochs;

     * Size of the segment in bytes.
    private final int segmentSizeInBytes;

     * It indicates the state in which the action is executed on this segment.
    private final RemoteLogSegmentState state;

     * @param remoteLogSegmentId  Universally unique remote log segment id.
     * @param startOffset         Start offset of this segment.
     * @param endOffset           End offset of this segment.
     * @param maxTimestamp        Maximum timestamp in this segment
     * @param leaderEpoch         Leader epoch of the broker.
     * @param eventTimestamp      Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param segmentSizeInBytes  Size of this segment in bytes.
     * @param state               State of the respective segment of remoteLogSegmentId.
     * @param segmentLeaderEpochs leader epochs occurred with in this segment
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
                                    long maxTimestamp, int leaderEpoch, long eventTimestamp,
                                    int segmentSizeInBytes, RemoteLogSegmentState state, Map<Int, Long> segmentLeaderEpochs) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.eventTimestamp = eventTimestamp;
        this.segmentLeaderEpochs = segmentLeaderEpochs;
        this.state = state;
        this.segmentSizeInBytes = segmentSizeInBytes;

     * @return unique id of this segment.
    public RemoteLogSegmentId remoteLogSegmentId() {
        return remoteLogSegmentId;

     * @return Start offset of this segment(inclusive).
    public long startOffset() {
        return startOffset;

     * @return End offset of this segment(inclusive).
    public long endOffset() {
        return endOffset;

     * @return Leader or controller epoch of the broker from where this event occurred.
    public int brokerEpoch() {
        return brokerEpoch;

     * @return Epoch time at which this evcent is occurred.
    public long eventTimestamp() {
        return eventTimestamp;

     * @return
    public int segmentSizeInBytes() {
        return segmentSizeInBytes;

    public RemoteLogSegmentState state() {
        return state;

    public long maxTimestamp() {
        return maxTimestamp;

    public Map<Int, Long> segmentLeaderEpochs() {
        return segmentLeaderEpochs;


public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File producerIdSnapshotIndex;
    private final ByteBuffer leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
                          ByteBuffer leaderEpochIndex) {
        this.logSegment = logSegment;
        this.offsetIndex = offsetIndex;
        this.timeIndex = timeIndex;
        this.txnIndex = txnIndex;
        this.producerIdSnapshotIndex = producerIdSnapshotIndex;
        this.leaderEpochIndex = leaderEpochIndex;

    public File logSegment() {
        return logSegment;

    public File offsetIndex() {
        return offsetIndex;

    public File timeIndex() {
        return timeIndex;

    public File txnIndex() {
        return txnIndex;

    public File producerIdSnapshotIndex() {
        return producerIdSnapshotIndex;

    public ByteBuffer leaderEpochIndex() {
        return leaderEpochIndex;
