Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

        RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy the log segments. The default implementation of the interface supports HDFS as remote storage. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration remote.storage.manager.class.

Code Block
languagescala
trait/**
 * RemoteStorageManager extendsis Configurablean withinterface AutoCloseablethat {

allows to /**
plugin different remote *storage Copiesimplementations LogSegmentto provided by [[RemoteLogManager]]
  copy and
 * Returnsretrieve the RDIslog of the remote datasegments.
 *
   * ThisAll methodthese isAPIs usedare by the leader
  still experimental.
 */
trait  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment): util.List[RemoteLogIndexEntry]RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * CancelsEarliest thelog unfinishedoffset LogSegmentif copyingexists offor thisthe given topic-partition
   */
  def cancelCopyingLogSegment(topicPartition: TopicPartition): Unit

  /**
   * List partition in the remote storage. Return -1 if there are no
   * segments in the remote log segment files of the specified topicPartitionstorage.
   *
   * The RLM of a follower uses this method to find out the remote data
   *@param tp
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegmentsearliestLogOffset(topicPartitiontp: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }Long

  /**
   * ListCopies theLogSegment remoteprovided log segment files of the specified topicPartition starting from the base offset minBaseOffsetby [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns Thethe RLMRDIs of athe followerremote usesdata. thisThis method to find out the remote data
   is invoked by the leader of topic partition.
   *
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry]

  /**
   * Cancels the unfinished LogSegment copying of the given topic-partition.
   *
   * @param topicPartition
   */
  def cancelCopyingLogSegment(topicPartition: TopicPartition): Unit

  /**
   * List the remote log segment files of the given topicPartition.
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition starting from the base offset minBaseOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minBaseOffset The minimum base offset for a segment to be returned.
   * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo]

  /**
   * Returns a List of RemoteLogIndexEntry for the given RemoteLogSegmentInfo. This is used by follower to store remote
   * log indexes locally.
   *
   * @param remoteLogSegment
   * @return
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * @paramDeletes minBaseOffsetremote TheLogSegment minimumfile baseand offsetindexes for athe segment to be returned.given remoteLogSegmentInfo
   *
 @return List of* remote@param segmentsremoteLogSegmentInfo
 starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.* @return
   */
  @throws(classOf[IOException])
  def listRemoteSegmentsdeleteLogSegment(topicPartitionremoteLogSegmentInfo: TopicPartition, minBaseOffset: LongRemoteLogSegmentInfo): util.List[RemoteLogSegmentInfo]Boolean

  /**
   * CalledDelete byall the RLMlog tosegments retrievefor the RemoteLogIndex entries of given topic partition. This can be done by rename the specifiedexisting remoteLogSegment.locations
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**and delete them later in asynchronous manner.
   *
   * Deletes remote LogSegment@param filetopicPartition
 provided by the* RLM@return
   */
  @throws(classOf[IOException])
  def deleteLogSegmentdeleteTopicPartition(remoteLogSegmenttopicPartition: RemoteLogSegmentInfoTopicPartition): Boolean

  /**
   * Delete allRemove the log segments for which are older than the given topic partitioncleanUpTillMs. ThisReturn canthe belog donestart byoffset renameof the existing locations
   * andearliest deleteremote themlog latersegment inif asynchronousexists manner.
or -1 if */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**there are no log segments in the remote storage.
   *
   * Remove@param thetopicPartition
 log segments which* are@param oldercleanUpTillMs
 than the given* cleanUpTillMs@return
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that
   * is greater than or equals to the startOffset.
   *
 startOffset, It *will Will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

  /**
   * stopsRelease allany thesystem threadsresources andused closesby thethis instance.
   */
  def close(): Unit
}

...

When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar with to the existing produce / fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

...