Code Block

 RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete operations.
 RemoteLogMetadataManager is responsible for storing and fetching metadata about the remote log segments from remote
 storage.
 * <p>
 Each upload or copy of a segment is initiated with a {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
 which is universally unique even for the
 same topic partition and offsets.
 <p>
 RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager}
before and after copy/delete operations on
 RemoteStorageManager with the respective {@link RemoteLogSegmentMetadata.State}. {@link RemoteLogMetadataManager} is
 * Thisresponsible allowsfor RemoteStorageManagerstoring toand storefetching segmentsmetadata even about the remote log segments in a strongly consistent manner.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 * <p>
 All these APIs are still evolving.
public interface RemoteStorageManager extends Configurable, Closeable {
    InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);

     Copies LogSegmentData provided for the given {@param remoteLogSegmentMetadata}.
     <p>
     Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()#id()}
 even when it
  even when it retries to invoke this method for the same log segment data.
     @param remoteLogSegmentMetadata
metadata about the remote log segment.
     @param logSegmentData           data to be copied to tiered storage.
     @throws RemoteStorageException if there are any errors in storing the data of the segment.
    void copyLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData)
            throws RemoteStorageException;

     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log
     * segment data file/object.
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     @param startPosition
     @param endPosition
     start
position of log segment to be read, inclusive.
 @param endPosition
          end position of log segment to be read, inclusive.
     @return
     @throws RemoteStorageException if there are any errors while fetching the desired segment.

    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    Long startPosition, Long endPosition) throws RemoteStorageException;

     * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return
     * @throws IOException RemoteStorageException if there are any errors while fetching the index.
    InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

     * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return
     * @throws IOExceptionRemoteStorageException if there are any errors while fetching the index.
    InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

 Returns the transaction index for
the respective log segment of {@link RemoteLogSegmentMetadata}.
     @throws RemoteStorageException
    @param remoteLogSegmentMetadata metadata
about

log segment.
     @return
     @throws
RemoteStorageException if there are any errors
while fetching the index.
    default InputStream fetchProducerSnapshotIndexfetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException()return EMPTY_INPUT_STREAM;

     * Returns the leaderproducer epochsnapshot index tillfor the the respective log segment of {@link RemoteLogSegmentMetadata}.
     @param remoteLogSegmentMetadata
 metadata about the remote log segment.
     @return
 @throws RemoteStorageException if there are any errors while fetching the index.
    default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException()return EMPTY_INPUT_STREAM;

     * DeletesReturns the remoteleader logepoch segmentindex for the giventhe remoteLogSegmentMetadata.respective Deletionlog issegment consideredof as successful if{@link RemoteLogSegmentMetadata}.
 this call returns successfully without* any@param exceptionsremoteLogSegmentMetadata aftermetadata aabout fewthe retries.remote Itlog willsegment.
 throw {@link RemoteStorageException} if there* are@return
     * @throws RemoteStorageException if there are any errors inwhile deletingfetching the fileindex.
     */
    default InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        return EMPTY_INPUT_STREAM;
    }
  * @param remoteLogSegmentMetadata}

 * @throws IOException
  Deletes the
resources associated with the given {@param remoteLogSegmentMetadata}. Deletion is considered as


     @param remoteLogSegmentMetadata metadata

        about the remote log segment to be deleted.
 @throws RemoteStorageException
      if there are
any storage related errors

        this.id = requireNonNull(id);

public UUID id() {
`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.

Code Block

 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 * <p>
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These propertiess can be used if there is a single storage used
 * for different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 * <p>
 * todo-tier cleanup the abstractions in this interface.
public interface RemoteLogMetadataManager extends Configurable, Closeable {

     * Property name for broker id.
    return id;
    }
}

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.

Code Block

     * This methodinterface isprovides invokedstoring onlyand whenfetching theremote givenlog topicsegment partitionsmetadata arewith stoppedstrongly onconsistent this broker. This can happen when a
     * partition is emigrated to other broker or a partition is deleted.
     * @param partitions
    void onStopPartitions(Set<TopicPartition> partitions);

     * Callback to receive once server is started so that this class can run tasks which should be run only when the
     * server is started.
    void onServerStarted();

 * It describes the metadata about the log segment in the remote storage.
public class RemoteLogSegmentMetadata implements Serializablesemantics.
 * <p>
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These propertiess can be used if there is a single storage used
 * for different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 * <p>
 * todo-tier cleanup the abstractions in this interface.
public interface RemoteLogMetadataManager extends Configurable, Closeable {

     * ItProperty indicatesname thefor statebroker ofid.
 the remote log segment. This*/
 will be based onString theBROKER_ID action executed on this segment by= "";

     * Property remotename logfor servicecluster implementationid.
    String * todo: check whether the state validations to be checked or not, add next possible states for each stateCLUSTER_ID = "";

     * Stores RemoteLogSegmentMetadata with the containing RemoteLogSegmentId into RemoteLogMetadataManager.
    public enum* StateRemoteLogSegmentMetadata {

is identified by RemoteLogSegmentId.
     * @param remoteLogSegmentMetadata
  * This state indicates* that@throws theRemoteStorageException
 segment copying to remote storage*/
 is started but notvoid yet finished.
    putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

     * Fetches RemoteLogSegmentId  COPY_STARTED(),

        /**for the given topic partition which contains the given offset.
     * This statewill indicatesevolve thatto therefactor segment copyingTopicPartition to remoteTopicPartitionId storagewhich iscontains finished.
a unique identifier and TopicPartition.
     *   COPY_FINISHED(),

@param topicPartition
     * @param  /**offset
     * @return
   * This segment* is@throws markedRemoteStorageException
 for delete. That means, it*/
 is eligible for deletion. This is used when a topic/partitionRemoteLogSegmentId remoteLogSegmentId(TopicPartition topicPartition, long offset) throws RemoteStorageException;

         * is deleted so that deletion agents can start deleting them as the leader/follower does not exist.
     * Fetches RemoteLogSegmentMetadata for the given topic partition and offset.

    This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition.
     * @param topicPartition
   * This state* indicates@param thatoffset
 the segment deletion is started* but@return
 not yet finished.
  * @throws RemoteStorageException
    RemoteLogSegmentMetadata remoteLogSegmentMetadata(TopicPartition topicPartition, long DELETE_STARTED(),offset) throws RemoteStorageException;

 Returns earliest log offset if there are segments *in Thisthe stateremote indicatesstorage thatfor the segmentgiven istopic deletedpartition, successfully.else
     * returns   */{@link Optional#empty()}.
  * This }

is treated as the private static final long serialVersionUID = 1L;

effective log-start-offset of the topic partition's log.
     * todo check Universallywhether uniquewe remoteneed logto segmentpass idleader-epoch.
    private final* RemoteLogSegmentId remoteLogSegmentId;

    /**@param tp
     * Start offset of this segment.@return
    Optional<Long> private final long startOffsetearliestLogOffset(TopicPartition tp) throws RemoteStorageException;

     * EndReturns highest log offset of this segmenttopic partition in remote storage.
    private final* long@param endOffset;tp

     /** @return
     * Leader epoch of the broker.@throws RemoteStorageException
    Optional<Long> private final int leaderEpochhighestLogOffset(TopicPartition tp) throws RemoteStorageException;

     * MaximumDeletes timestampthe inlog the segment
 metadata for the given */
    private final long maxTimestamp;

     * Epoch@param timeremoteLogSegmentId
 at which the respective {@link* #state} is set.@throws RemoteStorageException
    private final long eventTimestampvoid deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws RemoteStorageException;

     * List LeaderEpochthe vsremote offsetlog forsegment messagesfiles withof inthe thisgiven segmenttopicPartition.
 The RemoteLogManager of privatea finalfollower Map<Long,uses Long> segmentLeaderEpochs;

     * Size of the segment in bytesthis method to find out the remote data for the given topic partition.
     */ <p>
    private final* long segmentSizeInBytes;

     * It indicates the state in which the action is executed on this segment.This is used in while deleting a given topic partition to fetch all the remote log segments for the given  topic
 partition and set privatea finaltombstone State state;

    /**marker for them to be deleted.
 @param remoteLogSegmentId  Universally unique* remote@return logList segmentof id.
remote segments, sorted by baseOffset *in @paramascending startOffsetorder.
    Startdefault offsetList<RemoteLogSegmentMetadata> of this segment.
     * @param endOffsetlistRemoteLogSegments(TopicPartition topicPartition) {
        return listRemoteLogSegments(topicPartition, 0);

 offset of this segment./**
     * @paramReturns maxTimestamplist of remote segments, sorted by {@link  maximum timestamp RemoteLogSegmentMetadata#startOffset()} in thisascending segmentorder
     * @paramwhich leaderEpochare >= the given min Offset.
    Leader epoch of the broker. *
     * @param eventTimestamptopicPartition
    Epoch time* at@param whichminOffset
 the remote log segment is* copied@return toList theof remote tier storage.
     * @param segmentSizeInBytes  size of this segment in bytessegments, sorted by baseOffset in ascending order.
 @param state  List<RemoteLogSegmentMetadata> The respective segment of remoteLogSegmentId is marked fro deletion.listRemoteLogSegments(TopicPartition topicPartition, long minOffset);

 @param segmentLeaderEpochs leader epochs occurred* withThis inmethod thisis segment
invoked only when there are */
changes in leadership of publicthe RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,topic partitions that this broker is
     * responsible for.
     * @param leaderPartitions   partitions that have become leaders on this broker.
     * @param longfollowerPartitions maxTimestamp,partitions intthat leaderEpoch,have longbecome eventTimestamp,
followers on this broker.
    void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions);

     * This method is invoked only when the given topic partitions longare segmentSizeInBytes,stopped Stateon state, Map<Long, Long> segmentLeaderEpochs) {
  this broker. This can happen when a
     * this.remoteLogSegmentIdpartition =is remoteLogSegmentId;
emigrated to other broker or a partition is thisdeleted.startOffset
 = startOffset;
     this.endOffset* =@param endOffset;partitions
    this.leaderEpoch = leaderEpochvoid onStopPartitions(Set<TopicPartition> partitions);

     this.maxTimestamp* =Callback maxTimestamp;
to receive once server is started so that this.eventTimestamp =class eventTimestamp;
can run tasks which should be run only this.segmentLeaderEpochs = segmentLeaderEpochs;when the
     * server is this.state = state;started.
   this.segmentSizeInBytes =void segmentSizeInBytesonServerStarted();

