...
Code Block | ||
---|---|---|
| ||
/** * RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy and * retrieve the log segments. * * All these APIs are still experimental. */ trait RemoteStorageManager extends Configurable with AutoCloseable { /** * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no * segments in the remote storage. * * @param tp * @return */ @throws(classOf[IOException]) def earliestLogOffset(tp: TopicPartition): Long /** * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch. * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition. * * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the * segment, log and offset/time indexes. * * @param topicPartition * @param logSegment * @return */ @throws(classOf[IOException]) def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry] /** * Cancels the unfinished LogSegment copying of the given topic-partition. * * @param topicPartition */ def cancelCopyingLogSegment(topicPartition: TopicPartition): Unit /** * List the remote log segment files of the given topicPartition. * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition. * * @return List of remote segments, sorted by baseOffset in ascending order. */ @throws(classOf[IOException]) def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = { listRemoteSegments(topicPartition, 0) } /** * List the remote log segment files of the specified topicPartition starting from the base offset minBaseOffset. * The RLM of a follower uses this method to find out the remote data * * @param minBaseOffset The minimum base offset for a segment to be returned. * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order. */ @throws(classOf[IOException]) def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo] /** * Returns a List of RemoteLogIndexEntry for the given RemoteLogSegmentInfo. This is used by follower to store remote * log indexes locally. * * @param remoteLogSegment * @return */ @throws(classOf[IOException]) def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry] /** * Deletes remote LogSegment file and indexes for the given remoteLogSegmentInfo * * @param remoteLogSegmentInfo * @return */ @throws(classOf[IOException]) def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean /** * Delete all the log segments for the given topic partition. This can be done by rename the existing locations * and delete them later in asynchronous manner. * * @param topicPartition * @return */ @throws(classOf[IOException]) def deleteTopicPartition(topicPartition: TopicPartition): Boolean /** * Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the * earliest remote log segment if exists or -1 if there are no log segments in the remote storage. * * @param topicPartition * @param cleanUpTillMs * @return */ @throws(classOf[IOException]) def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long /** * Read up to maxBytes data from remote storage, starting from the 1st batch that is greater than or equals to the * startOffset, It will read at least one batch, if the 1st batch size is larger than maxBytes. * * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset * @param maxBytes maximum bytes to fetch for the given entry * @param startOffset initial offset to be read from the given rdi in remoteLogIndexEntry * @param minOneMessage if true read at least one record even if the size is more than maxBytes * @return */ @throws(classOf[IOException]) def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records /** * Release any system resources used by this instance. */ def close(): Unit } |
Replica Manager
If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.
Proposed Changes
When an RLM class is configured and all the required configs are present, RLM will send a list of topic-partitions and invoke the RemoteLogManager.addTopicPartitions(). This function’s responsibility is to delegate these topic-partitions to RLM. RLM will monitor the log.dirs for these topic-partitions and copy the rolled over LogSegment to the configured remote storage. Once a LogSegment is copied over it should mark it as done.
Replica Manager
If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.
If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of copying the segment, it will finish the copy before it relinquishes the copy for topic-partition. This might leave duplicated messages
...
Code Block | ||
---|---|---|
| ||
def readFromLocaLog(): Seq[(TopicPartition, LogReadResult)] = { catch { case e@ (_: OffsetOutOfRangeException) => RemoteLogManager.read(fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, tp: TopicPartition, fetchInfo: PartitionData quota: ReplicaQuota) } |
Proposed Changes
...
Log Retention
Log retention will continue to work as it is today except for one case, where If a LogSegment is in the process of getting copied over and it doesn't have associated "copy-done" file, LogCleaner will skips these LogSegments until it has the marker to denote its copied over to remote and its safe to delete.
...