Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRemoteStorageManager
/**
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
     * information about this copy operation. This can include path to the object in the store etc.
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
     */
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData)
            throws IOExceptionRemoteStorageException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. If endPosition is given then the The stream will end at thatthe positionsmaller elseof itendPosition will be
     * given till and the end of the remote log
     * segment data file/object.
     *
     * @param remoteLogSegmentMetadata
     * @param startPosition
     * @param endPosition
     * @return
     * @throws IOException
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    Long startPosition, Optional<Long>Long endPosition) throws IOExceptionRemoteStorageException;

    /**
     * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws IOException
     */
    InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOExceptionRemoteStorageException;

    /**
     * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws IOException
     */
    InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOExceptionRemoteStorageException;

    /**
     *
 Deletes the remote log segment for the given remoteLogSegmentId. Returns true if the deletion is successful.* @param remoteLogSegmentMetadata
     * @return
     * @throws RemoteStorageException
     * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
     * can do the cleanup later./
    default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException();
    }

    /**
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws IOExceptionRemoteStorageException
     */
    default booleanInputStream deleteLogSegmentfetchProducerSnapshotIndex(remoteLogSegmentMetadataRemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException IOException;
}



/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific{
        throw new UnsupportedOperationException();
    }

    /**
     * Deletes the remote log segment infor {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {the given remoteLogSegmentMetadata. Deletion is considered as successful if
    private TopicPartition* topicPartition;
this call returns successfully privatewithout UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {any exceptions. It will throw {@link RemoteStorageException} if there are
     * any  this.topicPartition = requireNonNull(topicPartition);errors in deleting the file.
     *
   this.id = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
    * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
     return* topicPartition;
can do the cleanup }
later.
    public UUID id() { *
     * @param  return id;
    }
...remoteLogSegmentMetadata
     * @throws IOException
     */
    void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

}


public class LogSegmentData {

/**
 * This represents a privateuniversally FileRecordsunique logSegment;
id associated to a privatetopic File offsetIndex;
    private File timeIndex;
    //todo add other required indexes like txnIndex
...
}

RemoteStorageManager_Old

partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);
        this.id = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

    public UUID id() {
        return id;
    }
...
}



public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File leaderEpochCache;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File leaderEpochCache) {
        this.logSegment = logSegment;
        this.offsetIndex = offsetIndex;
        this.timeIndex = timeIndex;
        this.txnIndex = txnIndex;
        this.producerIdSnapshotIndex = producerIdSnapshotIndex;
    }


RemoteStorageManager_Old


Code Block
languagejava
titleRemoteStorageManager_Old
collapsetrue
/**
 * RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy and
 * retrieve the log segments.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Earliest log offset if exists
Code Block
languagejava
titleRemoteStorageManager_Old
collapsetrue
/**
 * RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy and
 * retrieve the log segments.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
   * segments in the remote storage.
   *
   * @param tp
   * @return
   */
  @throws(classOf[IOException])
  def earliestLogOffset(tp: TopicPartition): Long

  /**
   * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
   *
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment,
                     leaderEpoch: Int): util.List[RemoteLogIndexEntry]

  /**
   * List the remote log segment files of the given topicPartition.
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   * in the remote storage. Return -1 if there are no
   * @returnsegments Listin ofthe remote segments, sorted by baseOffset in ascending order. storage.
   *
   * @param tp
   * @return
   */
  @throws(classOf[IOException])
  def listRemoteSegmentsearliestLogOffset(topicPartitiontp: TopicPartition): util.List[RemoteLogSegmentInfo] = {Long

  /**
   * listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition with offset >= minOffsetCopies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * TheReturns the RLMRDIs of athe followerremote usesdata. thisThis method tois findinvoked outby the remote dataleader of topic partition.
   *
   * @param//todo minOffsetLogSegment Theis minimumnot offsetpublic, forthis awill segmentbe tochanged bewith returned.
an interface which *provides @returnbase Listand end offset of remote segments that contains offsets >= minOffset, sorted by baseOffset in ascending order. the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def listRemoteSegmentscopyLogSegment(topicPartition: TopicPartition, minOffset: Long logSegment: LogSegment,
                     leaderEpoch: Int): util.List[RemoteLogSegmentInfoRemoteLogIndexEntry]

  /**
   * ReturnsList the aremote Listlog ofsegment RemoteLogIndexEntryfiles forof the given RemoteLogSegmentInfotopicPartition.
   * ThisThe isRemoteLogManager usedof bya follower uses this method to storefind out the remote
 data for *the loggiven indexestopic locallypartition.
   *
   * @param remoteLogSegment
   * @return@return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntrieslistRemoteSegments(remoteLogSegmenttopicPartition: RemoteLogSegmentInfoTopicPartition): util.List[RemoteLogIndexEntry]RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List Deletesthe remote LogSegmentlog filesegment andfiles indexesof for the givenspecified remoteLogSegmentInfo
topicPartition with offset *>= minOffset.
   * @paramThe remoteLogSegmentInfo
RLM of a *follower @return
uses this method */
to  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean

  /*find out the remote data
   *
   * Delete@param allminOffset theThe logminimum segmentsoffset for thea segment givento topicbe partitionreturned.
 This can be* done@return byList renameof theremote existingsegments locations
that contains offsets *>= andminOffset, deletesorted themby laterbaseOffset in asynchronousascending mannerorder.
   *
   * @param topicPartition
   * @return
   */
  @throws(classOf[IOException])
  def deleteTopicPartitionlistRemoteSegments(topicPartition: TopicPartition, minOffset: Long): Booleanutil.List[RemoteLogSegmentInfo]

  /**
   * RemoveReturns thea logList segmentsof which are older thanRemoteLogIndexEntry for the given cleanUpTillMsRemoteLogSegmentInfo. This Returnis theused logby startfollower offsetto ofstore theremote
   * earliest remote log segmentindexes iflocally.
 exists or -1*
 if there are* no@param logremoteLogSegment
 segments in the remote storage.* @return
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * @param topicPartition Deletes remote LogSegment file and indexes for the given remoteLogSegmentInfo
   *
   * @param cleanUpTillMsremoteLogSegmentInfo
   * @return
   */
  @throws(classOf[IOException])
  def cleanupLogUntildeleteLogSegment(topicPartitionremoteLogSegmentInfo: TopicPartition, cleanUpTillMs: LongRemoteLogSegmentInfo): LongBoolean

  /**
   * ReadDelete upall tothe maxByteslog datasegments fromfor remotethe storage,given startingtopic frompartition. theThis 1stcan batchbe thatdone isby greaterrename thanthe or equals to theexisting locations
   * startOffset.and Itdelete willthem readlater atin leastasynchronous onemanner.
 batch, if the*
 1st batch size* is larger than maxBytes.@param topicPartition
   * @return
   */
 @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * @paramRemove maxBytesthe log segments which are older than the given cleanUpTillMs. Return the maximumlog bytesstart tooffset fetchof for the given entry
   * @paramearliest startOffsetremote log segment if exists or -1 if there initialare offsetno tolog besegments readin fromthe theremote givenstorage.
 rdi in remoteLogIndexEntry*
   * @param minOneMessagetopicPartition
   *    if true read at least one record even if the size is more than maxBytes@param cleanUpTillMs
   * @return
   */
  @throws(classOf[IOException])
  def readcleanupLogUntil(remoteLogIndexEntrytopicPartition: RemoteLogIndexEntryTopicPartition, maxBytescleanUpTillMs: Int, startOffset: Long, minOneMessage: Boolean): RecordsLong


  /**
   * SearchRead forwardup forto themaxBytes firstdata messagefrom thatremote meetsstorage, thestarting followingfrom requirements:
the 1st  * - Message's timestamp batch that is greater than or equals to the targetTimestamp.
   * - Message's offset is greater than or equals to the startingOffsetstartOffset. It will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param targetTimestampremoteLogIndexEntry The timestampfirst toremoteLogIndexEntry searchthat for.remoteLogIndexEntry.lastOffset >= startOffset
   * @param startingOffsetmaxBytes  The starting offset to search.
   * @return The timestampmaximum andbytes offsetto offetch for the message found. Null if no message is found.given entry
   */
  @throws(classOf[IOException])
@param startOffset  def findOffsetByTimestamp(remoteLogIndexEntry: RemoteLogIndexEntry,
     initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if targetTimestamp: Long,
                            startingOffset: Long): TimestampAndOffsettrue read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records


  /**
   * Release any system resources used by this instance. Search forward for the first message that meets the following requirements:
   */
 - def close(): Unit
}

RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.

Message's timestamp is greater than or equals to the targetTimestamp.
   * - Message's offset is greater than or equals to the startingOffset.
   *
   * @param targetTimestamp The timestamp to search for.
   * @param startingOffset  The starting offset to search.
   * @return The timestamp and offset of the message found. Null if no message is found.
   */
  @throws(classOf[IOException])
  def findOffsetByTimestamp(remoteLogIndexEntry: RemoteLogIndexEntry,
                            targetTimestamp: Long,
                            startingOffset: Long): TimestampAndOffset

  /**
   * Release any system resources used by this instance.
   */
  def close(): Unit
}


RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.


Code Block
languagejava
titleRemoteLogMetadataManager


/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 *
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These props can be used if there is a single storage used for
 * different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 *
 * todo-tier cleanup the abstractions in this interface.
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     *
     */
    String BROKER_ID = "broker.id";

    /**
     *
     */
    String CLUSTER_ID = "cluster.id";

    /**
     * Stores RemoteLogSegmentMetadata with the given RemoteLogSegmentId.
     *
     * @param remoteLogSegmentMetadata
     * @throws IOException
     */
    void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException;

    /**
     * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws IOException
     */
    RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offset) throws IOException;

    /**
     * Fetches RemoteLogSegmentMetadata for the given RemoteLogSegmentId.
     *
     * @param remoteLogSegmentId
     * @return
     * @throws IOException
     */
    RemoteLogSegmentMetadata getRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException;

    /**
     * Earliest log offset if exists for the given topic partition in the remote storage. Return {@link Optional#empty()}
     * if there are no segments in the remote storage.
     *
     * @param tp
     * @return
     */
    Optional<Long> earliestLogOffset(TopicPartition tp) throws IOException;

    /**
     *
     * @param tp
     * @return
     * @throws IOException
     */
    Optional<Long> highestLogOffset(TopicPartition tp) throws IOException;

    /**
     * Deletes the log segment metadata for the given remoteLogSegmentId.
     *
     * @param remoteLogSegmentId
     * @throws IOException
     */
    void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException;

    /**
     * List the remote log segment files of the given topicPartition.
     * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     *
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    default List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) {
        return listRemoteLogSegments(topicPartition, 0);
    }

    /**
     * @param topicPartition
     * @param minOffset
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset);

    /**
     * This method is invoked only when there are changes in leadership of the topic partitions that this broker is
Code Block
languagejava
titleRemoteLogMetadataManager

/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 *
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Storesresponsible RemoteLogSegmentMetadata for the given RemoteLogSegmentMetadata.
     *
     * @param remoteLogSegmentId leaderPartitions   partitions that have become leaders on this broker.
     * @param remoteLogSegmentMetadata
followerPartitions partitions that have become *followers on @throwsthis IOExceptionbroker.
     */
    void putRemoteLogSegmentDataonPartitionLeadershipChanges(RemoteLogSegmentIdSet<TopicPartition> remoteLogSegmentIdleaderPartitions, RemoteLogSegmentMetadataSet<TopicPartition> remoteLogSegmentMetadatafollowerPartitions) throws IOException;

    /**
     * Fetches RemoteLogSegmentId forThis method is invoked only when the given topic partitionpartitions whichare containsstopped theon giventhis offsetbroker.
 This can happen when *a
     * @param topicPartition
     * @param offsetpartition is emigrated to other broker or a partition is deleted.
     * @return
     * @throws@param IOExceptionpartitions
     */
    RemoteLogSegmentIdvoid getRemoteLogSegmentIdonStopPartitions(TopicPartition topicPartition, long offset) throws IOExceptionSet<TopicPartition> partitions);

    /**
     * FetchesCallback RemoteLogSegmentMetadatato forreceive theonce givenserver RemoteLogSegmentId.
is started so that this *
class can run tasks which *should @parambe metadata
run only when   * @returnthe
     * @throwsserver IOException
     */
    RemoteLogSegmentMetadata getRemoteLogSegmentMetadata(RemoteLogSegmentId metadata) throws IOException;

is started.
     */**
    void *onServerStarted(final Earliest log offset if exists forString serverEndpoint);
}




/**
 * Metadata about the givenlog topicsegment partitionstored in theremote remotetier storage.
 Return {@link Optional#empty()}
     * if there are no segments in the remote storage.*/
public class RemoteLogSegmentMetadata implements Serializable {

    private static final long serialVersionUID = 1L;

     /**
     * @paramUniversally tp
unique remote log   * @returnsegment id.
     */
    Optional<Long> earliestLogOffset(TopicPartition tp) throws IOExceptionprivate final RemoteLogSegmentId remoteLogSegmentId;

    /**
     * SetsStart the earliest offset forof the given topic partitionthis segment.
     */
    private final long startOffset;

  * @param tp/**
     * @throws IOException End offset of this segment.
     */
    void setEarliestLogOffset(TopicPartition tp,private final long offset) throws IOExceptionendOffset;

    /**
     * Leader Deletesepoch of the logbroker.
  segment metadata for the*/
 given remoteLogSegmentId.
  private final int *leaderEpoch;

    /**
 * @param remoteLogSegmentId
  * Maximum timestamp *in @throwsthe IOExceptionsegment
     */
    voidprivate deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOExceptionfinal long maxTimestamp;

    /**
     * ListEpoch time at which the remote log segment is filescopied ofto the remote giventier topicPartitionstorage.
     */
 The RemoteLogManager of aprivate follower uses this method to find out the remote data for the given topic partition.
     *long createdTimestamp;

    /**
     * It indicates that this is marked for deletion.
     */
 @return List of remoteprivate segments, sorted by baseOffset in ascending order.boolean markedForDeletion;

    /**
     */
 Any context returned defaultby List<RemoteLogSegmentMetadata>{@link listRemoteLogSegmentsRemoteStorageManager#copyLogSegment(TopicPartitionRemoteLogSegmentId, topicPartitionLogSegmentData)} {for
     * the given remoteLogSegmentId
   return listRemoteLogSegments(topicPartition, 0); */
    }private final byte[] remoteLogSegmentContext;

    /**
     *
 @param remoteLogSegmentId   * @param topicPartition
 Universally unique remote log * @param minOffsetsegment id.
     * @param @return
startOffset     */
    List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset);

Start offset of this /**segment.
     * This method is invoked only when there are changes in leadership of the topic partitions that this broker is
     * responsible for @param endOffset               End offset of this segment.
     *
 @param maxTimestamp    * @param leaderPartitions   partitions that have becomemaximum leaderstimestamp onin this broker.segment
     * @param followerPartitions partitions that have bcome followers on this broker.
leaderEpoch         */
    voidLeader onPartitionLeadershipChanges(List<TopicPartition> leaderPartitions, List<TopicPartition> followerPartitions);

epoch of the broker.
     /**
 @param createdTimestamp   * This method is invoked onlyEpoch whentime theat givenwhich topicthe partitionsremote arelog stopped onsegment thisis broker.copied Thisto canthe happenremote whentier astorage.
     * partition@param markedForDeletion is emigrated to other broker or aThe partitionrespective issegment deleted.
of remoteLogSegmentId is marked fro *deletion.
     * @param partitions
remoteLogSegmentContext Any context returned by */
    void onStopPartitions(List<TopicPartition> partitions);

}


/**
 * Metadata about the log segment stored in remote tier storage.
 */
public class RemoteLogSegmentMetadata {

    /**
{@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}
     */
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
          * Universally unique remote log segment id.
     */
    private RemoteLogSegmentId remoteLogSegmentId;

    /**
     *long StartmaxTimestamp, offsetint ofleaderEpoch, thislong segment.createdTimestamp,
     */
    private long startOffset;

    /**
     * End offset of this segment.
     */
    private long endOffset;

    /**boolean markedForDeletion, byte[] remoteLogSegmentContext) {
     * leader epoch ofthis.remoteLogSegmentId the= broker.remoteLogSegmentId;
     */
     privatethis.startOffset int= leaderEpochstartOffset;

     /**
   this.endOffset = *endOffset;
 whether that remote segment is created or not.this.leaderEpoch = leaderEpoch;
     */
   this.maxTimestamp private= boolean createdmaxTimestamp;

    /**
    this.createdTimestamp *= AnycreatedTimestamp;
 context returned by {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} for
 this.markedForDeletion = markedForDeletion;
  * the given remoteLogSegmentId
   this.remoteLogSegmentContext = */remoteLogSegmentContext;
    private byte[] remoteLogSegmentContext;
}
...
}


Proposed Changes

High-level design

...