...
Code Block | ||
---|---|---|
| ||
class RemoteLogManager extends Configurable { val RemoteStorageManager // configure def configure(Map<String, ?> configs) // CopiesAsk LogSegmentsRLM yetto tomonitor bethe copiedgiven toTopicPartitions, remoteand storagecopy forinactive theLog given set of TopicPartitions. // Segments to remote //storage. ItRLM updates local index RemoteLogSegmentIndexfiles once a LogSegment // Log Segment in a TopicPartition is copied to Remote Storage. def addPartitions(topicPartitions: Set[TopicPartition]): boolean // Stops copy of LogSegment if TopicPartition ownership is moved from a broker. def removePartitions(topicPartitions: Set[TopicPartition]): boolean // Read topic partition data from remote def read(fetchMaxByes: Int, hardMaxBytesLimit:Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)]): LogReadResult // Stops all the threads and closes the instance. def shutdown(): Unit } |
...
When an RLM class is configured and all the required configs are present, RLM will send a list of topic-partitions and invoke thethe RemoteLogManager.addTopicPartitions(). This function's function’s responsibility is to delegate these topic-partitions to RLM. RLM will monitor the log.dirs for the given these topic-partitions and copy the rolled over LogSegment to the configured remote storage. Once a LogSegment is copied over it should mark it as done.
Log Retention
Log retention will continue to work as it is today except for one case, where If a LogSegment is in the process of getting copied over and it doesn't have associated "copy-done" file, LogCleaner will skips these LogSegments until it has the marker to denote its copied over to remote and its safe to delete.
...