Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Each thread in the thread pool processes one topic-partition at a time in the following steps:

Copy log segments to remote storage (leader)

Copy the log segment files that are 1) inactive and 2) the offset range is not covered by the segments on the remote storage.

If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest.

Retrieve the latest remote storage information (both leader and followerStep 1) Delete old remote segments (leader)

Call RSM.listRemoteSegments() to get the list of remote segments.

If any of the remote segments are older than the configured remote log retention time, delete the segment from remote storage.

Ask RSM to delete the temporary files in remote storage that are older than the configured remote retention time. Failed RSM operations may leave some temporary files in remote storage. Instead of introducing a "garbage collection time" configuration, we will just delete those temporary files after the remote log retention time.

Step 2) Copy log segments to remote storage (leader)

Copy the log segment files that are 1) inactive and 2) the offset range is not covered by the segments on the remote storage.

If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest.

Step 3) Retrieve the latest remote storage information (both leader and follower)

Call RSM.listRemoteSegments() to get the list of remote segments

If any of the remote segments are not not added to the local index yet, call RSM.getRemoteLogIndexEntries to retrieve the remote log index data, append to the local remoteLogIndex file, and build the local index files accordingly. Roll the local remoteLogIndex file, when needed. Update the "latest remote offset" of the topic-partition.

Update the "earliest remote offset" of the topic-partition to the base of the topic-partition to the base offset of the earliest remote segment, when old remote segments are removed from the remote storage. Delete the old local remoteLogIndex file, if the "earliest remote offset" is larger than the last offset of the file.

Handle expired remote segments (leader and follower)

For leader, it invokes RSM.cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long) to delete remote log segments and return the start offset of the earliest remote segment, when old remote segments are removed from the remote storage. Delete the old local remoteLogIndex file, if the "earliest remote offset" is larger than the last offset of the filelog segment.

For follower, it fetches the earliest offset by calling RSM.earliestLogOffset(tp: TopicPartition).

Both leader and follower cleansup the existing indexes till that offset and updates start offset with the received value.


2. Remote Storage Fetcher Thread Pool

...