Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For follower fetch, the leader only returns the data that is still in the leader's local storage. If a LogSegment copied into Remote Storage by a Leader Broker, it's not necessary for Follower to copy this segment which is already present in Remote Storage. Instead, a follower will retrieve the information of the segment from remote storage. If a Replica becomes a Leader, It can still locate and serve data from Remote storage.


Thread Pools

Remote storage (e.g. S3 / HDFS) is likely have higher I/O latency and lower availability than local storage.

When the remote storage becoming temporarily unavailable (up to several hours) or having high latency (up to minutes) , Kafka should still be able to operate normally. All the Kafka operations (produce, consume local data, create / expand topics etc.) that do not rely on remote storage should not be impacted. The consumers that try to consume the remote data should get reasonable errors, when remote storage is unavailable or the remote storage requests timeout.

To achieve this, we have to handle remote storage operations in dedicated threads pools, instead of Kafka I/O threads and fetcher threads.


1. Remote Storage Manager (RLM) Thread Pool


RLM maintains a list of the topic-partitions it manages. The list is updated in Kafka I/O threads, when topic-partitions are added to / removed from RLM. Each topic-partition in the list is assigned a scheduled processing time. The RLM thread pool processes the topic-partitions that the "scheduled processing time" is less than or equal to the current time.

When a new topic-partition is assigned to the broker, the topic-partition is added to the list, with scheduled processing time = 0, which means the topic-partition has to be processed immediately, to retrieve information from remote storage.

After a topic-partition is successfully processed by the thread pool, it's scheduled processing time is set to ( now() + rlm_process_interval_ms ). rlm_process_interval_ms can be configured in broker config file.

If the process of a topic-partition is failed due to remote storage error, its scheduled processing time is set to ( now() + rlm_retry_interval_ms ). rlm_retry_interval_ms can be configured in broker config file.

When a topic-partition is unassigned from the broker, the topic-partition is not currently processed by the thread pool, the topic-partition is directly removed from the list; otherwise, the topic-partition is marked as "deleted", and will be removed after the current process is done.

Each thread in the thread pool processes one topic-partition at a time in the following steps:

Step 1) Delete old remote segments (leader)

Call RSM.listRemoteSegments() to get the list of remote segments.

If any of the remote segments are older than the configured remote log retention time, delete the segment from remote storage.

Ask RSM to delete the temporary files in remote storage that are older than the configured remote retention time. Failed RSM operations may leave some temporary files in remote storage. Instead of introducing a "garbage collection time" configuration, we will just delete those temporary files after the remote log retention time.

Step 2) Copy log segments to remote storage (leader)

Copy the log segment files that are 1) inactive and 2) the offset range is not covered by the segments on the remote storage.

If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest.

Step 3) Retrieve the latest remote storage information (both leader and follower)

Call RSM.listRemoteSegments() to get the list of remote segments

If any of the remote segments are not added to the local index yet, call RSM.getRemoteLogIndexEntries to retrieve the remote log index data, append to the local remoteLogIndex file, and build the local index files accordingly. Roll the local remoteLogIndex file, when needed. Update the "latest remote offset" of the topic-partition.

Update the "earliest remote offset" of the topic-partition to the base offset of the earliest remote segment, when old remote segments are removed from the remote storage. Delete the old local remoteLogIndex file, if the "earliest remote offset" is larger than the last offset of the file.


2. Remote Storage Fetcher Thread Pool


When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar with the existing produce / fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

Each thread in the thread pool processes one remote fetch request at a time. The remote storage fetch thread will

1) find out the corresponding RDI from the remote log index

2) try to retrieve the data from remote storage

2.1) if success, RemoteFetchPurgatory will be notified to return the data to the client

2.2) if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.

2.3) if the remote storage operation failed (remote storage is temporarily unavailable), the operation will be retried with Exponential Back-Off, until the original consumer fetch request timeout.

Alternatives considered

Following alternatives were considered:

...