Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: Early proposal. To be finalized during implementation.

Code Block
languagescala
Traittrait RemoteStorageManager extends Configurable with AutoCloseable {
    
     // Configure/**
     def configure(Map<String, ?> configs)

     // * Copies LogSegment provided by the RLM[[RemoteLogManager]]
   *  // Returns the RDIs of the remote data
   *  // This method is used by the leader
   */
  @throws(classOf[IOException])
  def copyLogSegment(logSegmenttopicPartition: TopicPartition, logSegment: LogSegment): (boolean, Sequtil.List[RemoteLogIndexEntry]) 

  /**
   //* ListCancels the remoteunfinished logLogSegment segmentcopying files of thethis specifiedgiven topicPartitiontopic-partition
     *//
 The RLM of a follower uses this method to find outdef cancelCopyingLogSegment(topicPartition: TopicPartition): Unit

  /**
   * List the remote data
log segment files of the defspecified listRemoteSegments(topicPartition: TopicPartition): Seq[RemoteLogSegmentInfo]

     // Called by the RLM of a follower to retrieve RemoteLogIndex entries
     // of the new remote log segment
     def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): Seq[RemoteLogIndexEntry])

     // Deletes remote LogSegment file provided by the RLM
     def deleteLogSegment(remoteLogSegment: RemoteLogSegmentInfo): boolean

     // read topic partition data from remote storage,
     // starting from the given offset. 
     def read(remoteLocation: RDI, maxBytes: Int, offset: Int): LogReadInfo


     //topicPartition
   * The RLM of a follower uses this method to find out the remote data
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition starting from the base offset minBaseOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minBaseOffset The minimum base offset for a segment to be returned.
   * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo]

  /**
   * Called by the RLM to retrieve the RemoteLogIndex entries of the specified remoteLogSegment.
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * Deletes remote LogSegment file provided by the RLM
   */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegment: RemoteLogSegmentInfo): Boolean

  /**
   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that
   * is greater than or equals to the startOffset.
   *
   * Will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

  /**
   * stops all the threads and closes the instance.
   */
  def shutdownclose(): Unit   
}


Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.

...