Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 * {@link RemoteLogMetadataManager} is responsible storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 * All these APIs are still experimental.
public interface RemoteStorageManager extends Configurable, Closeable {

     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
     * information about this copy operation. This can include path to the object in the store etc.
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) throws IOException;

     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. If endPosition is given then the stream will end at that position else it will be
     * given till the end of the remote log segment data file/object.
     * @param remoteLogSegmentId
     * @param startPosition
     * @param endPosition
     * @return
     * @throws IOException
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentId, Long startPosition, Optional<Long> endPosition) throws IOException;

     * Deletes the remote log segment for the given remoteLogSegmentId. Returns true if the deletion is successful.
     * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
     * can do the cleanup later.
     * @param remoteLogSegmentId
     * @return
     * @throws IOException
    boolean deleteLogSegment(RemoteLogSegmentId remoteLogSegmentId) throws IOException;

 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition); = requireNonNull(id);

    public TopicPartition topicPartition() {
        return topicPartition;

    public UUID id() {
        return id;

public class LogSegmentData {

    private FileRecords logSegment;
    private File offsetIndex;
    private File timeIndex;
    //todo add other required indexes like txnIndex


Code Block
 * RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy and
 * retrieve the log segments.
 * All these APIs are still experimental.
trait RemoteStorageManager extends Configurable with AutoCloseable {

   * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
   * segments in the remote storage.
   * @param tp
   * @return
  def earliestLogOffset(tp: TopicPartition): Long

   * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   * @param topicPartition
   * @param logSegment
   * @return
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment,
                     leaderEpoch: Int): util.List[RemoteLogIndexEntry]

   * List the remote log segment files of the given topicPartition.
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   * @return List of remote segments, sorted by baseOffset in ascending order.
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)

   * List the remote log segment files of the specified topicPartition with offset >= minOffset.
   * The RLM of a follower uses this method to find out the remote data
   * @param minOffset The minimum offset for a segment to be returned.
   * @return List of remote segments that contains offsets >= minOffset, sorted by baseOffset in ascending order.
  def listRemoteSegments(topicPartition: TopicPartition, minOffset: Long): util.List[RemoteLogSegmentInfo]

   * Returns a List of RemoteLogIndexEntry for the given RemoteLogSegmentInfo. This is used by follower to store remote
   * log indexes locally.
   * @param remoteLogSegment
   * @return
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

   * Deletes remote LogSegment file and indexes for the given remoteLogSegmentInfo
   * @param remoteLogSegmentInfo
   * @return
  def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean

   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   * @param topicPartition
   * @return
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

   * Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the
   * earliest remote log segment if exists or -1 if there are no log segments in the remote storage.
   * @param topicPartition
   * @param cleanUpTillMs
   * @return
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

   * Read up to maxBytes data from remote storage, starting from the 1st batch that is greater than or equals to the
   * startOffset. It will read at least one batch, if the 1st batch size is larger than maxBytes.
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

   * Search forward for the first message that meets the following requirements:
   * - Message's timestamp is greater than or equals to the targetTimestamp.
   * - Message's offset is greater than or equals to the startingOffset.
   * @param targetTimestamp The timestamp to search for.
   * @param startingOffset  The starting offset to search.
   * @return The timestamp and offset of the message found. Null if no message is found.
  def findOffsetByTimestamp(remoteLogIndexEntry: RemoteLogIndexEntry,
                            targetTimestamp: Long,
                            startingOffset: Long): TimestampAndOffset

   * Release any system resources used by this instance.
  def close(): Unit
