Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager.readFromRemoteLog and returns the LogReadResult. If the fetch request is from a consumer, RLM will read the data from remote storage, and return the result to the consumer.

Follower Requests/Replication

For follower fetch, the leader only returns the data that is still in the leader's local storage. If a LogSegment copied into Remote Storage by a Leader Broker, it's not necessary for Follower to copy this segment which is already present in Remote Storage. Instead, a follower will retrieve the information of the segment from remote storage. If a Replica becomes a Leader, It can still locate and serve data from Remote storage.

Thread Pools

Remote storage (e.g. S3 / HDFS) is likely have higher I/O latency and lower availability than local storage.

...

To achieve this, we have to handle remote storage operations in dedicated threads pools, instead of Kafka I/O threads and fetcher threads.


1. Remote Storage Manager (RLM) Thread Pool


RLM maintains a list of the topic-partitions it manages. The list is updated in Kafka I/O threads, when topic-partitions are added to / removed from RLM. Each topic-partition in the list is assigned a scheduled processing time. The RLM thread pool processes the topic-partitions that the "scheduled processing time" is less than or equal to the current time.

...

Update the "earliest remote offset" of the topic-partition to the base offset of the earliest remote segment, when old remote segments are removed from the remote storage. Delete the old local remoteLogIndex file, if the "earliest remote offset" is larger than the last offset of the file.


2. Remote Storage Fetcher Thread Pool


When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar with the existing produce / fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

Each thread in the thread pool processes one remote fetch request at a time. The remote storage fetch thread will

1) find out the corresponding RDI from the remote log index

2) try to retrieve the data from remote storage

2.1) if success, RemoteFetchPurgatory will be notified to return the data to the client

2.2) if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.

2.3) if the remote storage operation failed (remote storage is temporarily unavailable), the operation will be retried with Exponential Back-Off, until the original consumer fetch request timeout.

Alternatives considered

Following alternatives were considered:

...