Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

System-Wide

remote.log.storage.enable - Whether to enable remote log storage or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility.

remote.log.storage.manager.class.name - This is mandatory if the remote.log.storage.enable is set as true.

remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt topic bases metadata manager backed by an internal topic.

RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

remote.log.storage.*

RemoteLogMetadataManager

(These configs are dependent on remote log metadata manager implementation)

remote.log.metadata.*

Thread pools

remote.log.manager.thread.pool.size
Remote log thread pool size, which is used in scheduling tasks to copy segments, fetch remote log indexes and clean up remote log segments.

remote.log.manager.task.interval.ms
The interval at which remote log manager runs the scheduled tasks like copy segments, fetch remote log indexes and clean up remote log segments.

remote.log.reader.threads
Remote log reader thread pool size

remote.log.reader.max.pending.tasks
Maximum remote log reader thread pool task queue size. If the task queue is full, broker will stop reading remote log segments.

Per Topic Configuration

remote.log.retention.minutes

remote.log.retention.bytes

...

  • receives callback events for leadership changes and stop/delete events of topic partitions on a broker.
  • delegates copy, read, and read delete of these segments and deleting topic partitions topic partition segments to a pluggable storage manager(viz RemoteStorageManager) implementation and maintains respective remote log segment metadata through RemoteLogMetadataManager.


RLM creates tasks for each leader or follower topic partition:

  • RLM Leader Task
    • It checks for rolled over LogSegments (which have the last message offset less than last stable offset of that topic partition) and copies them along with their remote offset/time indexes to the remote tier. It also serves the fetch requests for older data from the remote tier. Local logs are not cleaned up till those segments are copied successfully to remote even though their retention time/size is reached.
    • We proposed an approach to creating a RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. These indexes are described in more detail here. This allows having a larger index interval of remote log segments instead of a large number of short index files. It also supports encrypted segments by encrypting individual record batch and build the respective indexes as the local segment will not be useful. The initial version of this approach is implemented in PR. We want to proceed with local log indexes in the initial version and we may want to go with remote log indexes in later versions.  
  • RLM Follower Task 
    • It keeps track of the segments and index files on remote tier by looking into RemoteLogMetdataManager. RLM follower can also serve reading old data from the remote tier.

Core Kafka changes

To satisfy the goal of keeping Kafka changes minimal when RLM is not configured, Kafka behavior remains unchanged for existing users.

  • Core Kafka starts RLM service if tiered storage is configured
  • When an offset index is not found, if RLM is configured, the read request is delegated to RLM to serve the data from the remote tier.

Local and Remote log offset constraints

Below are the leader topic partition's log offsets

Image Removed

Local and Remote log offset constraints

Below are the leader topic partition's log offsets

Image Added

            Lx  = Local log start offset           Lz  = Local log end offset            Ly  = Last stable offset(LSO)

...

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the ReplicaFetcherManager works today.

If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of copying the segment, it will finish the copy before it relinquishes the copy for topic-partition. This might leave duplicated messages.

Consumer Fetch Requests

ReplicaManager.readLocalLog works as it does today. But only in case of OffsetOutOfRange of exception and RLM is configured we will delegate the read request to RLM which returns LogReadResult.

Code Block
languagescala
def readFromLocaLog(): Seq[(TopicPartition, LogReadResult)] = {
catch {
case e@ (_: OffsetOutOfRangeException) =>
    RemoteLogManager.read(fetchMaxBytes: Int,
                          hardMaxBytesLimit: Boolean, 
                          tp: TopicPartition, 
                          fetchInfo: PartitionData 
                          quota: ReplicaQuota)
}

Consumer Fetch Requests

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager. readFromRemoteLog and returns the LogReadResult. More details are explained in the RLM/RSM tasks section.

...

For follower fetch, the leader only returns the data that is still in the leader's local storage. If a LogSegment copied into remote storage by a leader broker, the follower doesn't need to copy this segment which LogSegments that exist only on remote storage are not replicated to followers as it is already present in remote storage. Instead, a follower will retrieve the information of the segment from remote storage. If a Replica becomes a leader, It can still locate and serve data from remote storage.

...

If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest. It uses the below copy API from RSM. It generates a universally unique RemoteLogSegmentId and invokes copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData). If it is successful then it calls RLMM#putRemoteLogSegmentDataRLMM.putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) to store metadata. 

Handle expired remote segments (leader and follower)

For leader, it invokes RSM.cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long) to delete remote log segments and return the start offset of the earliest remote log segment.

RLM leader computes the log segments to be deleted and it removes the remote log segment and its metadata.

RLM follower For follower, it fetches the earliest offset by calling RSMRLMM.earliestLogOffset(tp: TopicPartition).

...

  1. find out the corresponding RemoteLogSegmentId from RLMM and startPosition and endPosition from the offset index.
  2. try to build Records instance data fetched from RSM#fetchLogSegmentDataRSM.fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentId, Long startPosition, Optional<Long> endPosition)
    1. if success, RemoteFetchPurgatory will be notified to return the data to the client
    2. if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.
    3. if the remote storage operation failed (remote storage is temporarily unavailable), the operation will be retried with Exponential Back-Off, until the original consumer fetch request timeout.

...

Remote log metadata of partition’s remote log segments is stored in an internal topic called `__remote_log_metadata`. This topic can be created with default partitions count as 50. Users can configure the topic name and partitions count

In this design, RemoteLogMetadataManager(RLMM) is responsible for storing and fetching remote log metadata. It provides

...