Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

RLM receives a callback and unassigns the partition for leader/follower task, If the delete option is enabled then remote log segment metadata is updated in RLMM by enabling the delete marker. RLMM will eventually delete these segments by using RemoteStorageManager. As the segments are deleted asynchronously, creation of a topic with the same name later may cause issues because of the existing metadata about the earlier generation of the topic. It was discussed in the community earlier for adding UUID to represent a topic along with the name as part of KIP-516. That enhancement will be useful to address the issue mentioned here. 

OffsetForLeaderEpoch

Look into leader epoch checkpoint cache. This is stored in tiered storage and it may be fetched by followers from tiered storage as part of the fetch protocol. 

LogStartOffset

LogStartOffset of a topic can be either in local or in remote storage. This is already maintained in `Log` class while loading the logs and it can also be fetched from RemoteLogMetadataManager.  

JBOD related changes

Currently, JBOD is supported by altering log dirs in two ways.

  • Altering to a different dir on the local broker

    • This is not supported in this KIP but we may plan this in future releases.

  • Altering to a dir on a remote broker

    • This is equivalent to reassigning partitions to a different broker, which is already supported in this KIP as part of how followers behave with respect to remote tiered storage.


There are no changes with other protocol APIs because of tiered storage. 

RLM/RSM tasks and thread pools
Anchor
rlm-rsm-tasks
rlm-rsm-tasks

Remote storage (e.g. HDFS/S3/GCP) is likely to have higher I/O latency and lower availability than local storage.

When the remote storage becoming temporarily unavailable (up to several hours) or having high latency (up to minutes), Kafka should still be able to operate normally. All the Kafka operations (produce, consume local data, create/expand topics, etc.) that do not rely on remote storage should not be impacted. The consumers that try to consume the remote data should get reasonable errors, when remote storage is unavailable or the remote storage requests timeout.

To achieve this, we have to handle remote storage operations in dedicated threads pools, instead of Kafka I/O threads and fetcher threads.

1. Remote Log Manager (RLM) Thread Pool

RLM maintains a list of the topic-partitions it manages. The list is updated in Kafka I/O threads, when topic-partitions are added to / removed from RLM. Each topic-partition in the list is assigned a scheduled processing time. The RLM thread pool processes the topic-partitions that the "scheduled processing time" is less than or equal to the current time.

When a new topic-partition is assigned to the broker, the topic-partition is added to the list, with scheduled processing time = 0, which means the topic-partition has to be processed immediately, to retrieve information from remote storage.

After a topic-partition is successfully processed by the thread pool, it's scheduled processing time is set to ( now() + rlm_process_interval_ms ). rlm_process_interval_ms can be configured in broker config file.

If the process of a topic-partition is failed due to remote storage error, its scheduled processing time is set to ( now() + rlm_retry_interval_ms ). rlm_retry_interval_ms can be configured in broker config file.

When a topic-partition is unassigned from the broker, the topic-partition is not currently processed by the thread pool, the topic-partition is directly removed from the list; otherwise, the topic-partition is marked as "deleted", and will be removed after the current process is done.

Each thread in the thread pool processes one topic-partition at a time in the following steps:

Copy log segments to remote storage (leader)

Copy the log segment files that are

       - inactive and

       - the offset range is not covered completely by the segments on the remote storage and

      - those segments have the last offset < last-stable-offset of the partition.

If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest. It generates a universally unique RemoteLogSegmentId for each segment, it calls RLMM.putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) and it invokes copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) on RSMIf it is successful then  it calls RLMM.putRemoteLogSegmentData with the updated RemoteLogSegmentMetadata instance else it removes the entry. Any dangling entries will be removed while removing expired log segments based on remote retention. 

Handle expired remote segments (leader and follower)

RLM leader computes the log segments to be deleted based on the remote retention config. It updates the earliest offset for the given topic partition in RLMM. It gets all the remote log segment ids and removes them from remote storage using RemoteStorageManager. It also removes respective metadata using RemoteLogMetadataManager. If there are any failures in removing remote log segments then those are stored in a specific topic (default as __remote_segments_to_be_deleted) and user can consume the events(which contain remote-log-segment-id) from that topic and clean them up from remote storage.  This can be improved upon in later versions. 

RLM follower fetches the earliest offset by calling RLMM.earliestLogOffset(tp: TopicPartition).Both leader and follower cleanup the existing indexes till that offset and updates start offset with the received value.

2. Remote Storage Fetcher Thread Pool

When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar to the existing produce/fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

Each thread in the thread pool processes one remote fetch request at a time. The remote storage fetch thread will

  1. find out the corresponding RemoteLogSegmentId from RLMM and startPosition and endPosition from the offset index.
  2. try to build Records instance data fetched from RSM.fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Long startPosition, Optional<Long> endPosition)
    1. if success, RemoteFetchPurgatory will be notified to return the data to the client
    2. if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.
    3. if the remote storage operation failed (remote storage is temporarily unavailable), the operation will be retried with Exponential Back-Off, until the original consumer fetch request timeout.

RemoteLogMetadataManager implemented with an internal topic

...