Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated with remote log segment metadata state transitions

...

When shipping a log segment to remote storage, the leader broker will store the leader epoch sequence and producer id snapshot up to the end of the segment into the same remote directory (or the same remote object key prefix). These data can be used by the followers to rebuild the leader epoch sequences and producer id snapshots when needed.

Image Modified


So, we need to add a respective ReplicaState for building auxiliary state which can be called `BuildingRemoteLogAuxState`. Fetcher thread processes this state also in every run as it does for Fetching and Truncating states.

...

1. Remote Log Manager (RLM) Thread Pool

RLM maintains a list of the topic-partitions it manages. The list is updated in Kafka I/O threads, when topic-partitions are added to / removed from RLM. Each topic-partition in the list is assigned a scheduled processing time. The RLM thread pool processes the topic-partitions that the "scheduled processing time" is less than or equal to the current time.

When a new topic-partition is assigned to the broker, the topic-partition is added to the list, with scheduled processing time = 0, which means the topic-partition has to be processed immediately, to retrieve information from remote storage. 

After a topic-partition is successfully processed by the thread pool, it's scheduled processing time is set to ( now() + rlm_process_interval_ms ). rlm_process_interval_ms can be configured in broker config file.

If the process of a topic-partition is failed due to remote storage error, its scheduled processing time is set to ( now() + rlm_retry_interval_ms ). rlm_retry_interval_ms can be configured in broker config file.

When a topic-partition is unassigned from the broker, the topic-partition is not currently processed by the thread pool, the topic-partition is directly removed from the list; otherwise, the topic-partition is marked as "deleted", and will be removed after the current process is done.

Each thread in the thread pool processes one topic-partition at a time in the following steps:

Copy log segments to remote storage (leader)

...

2. Remote Storage Fetcher Thread Pool

When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar to the existing produce/fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

Each thread in the thread pool processes one remote fetch request at a time. The remote storage fetch thread will

  1. find out the corresponding RemoteLogSegmentId from RLMM and startPosition and endPosition from the offset index.
  2. try to build Records instance data fetched from RSM.fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Long startPosition, Long endPosition)
    1. if success, RemoteFetchPurgatory will be notified to return the data to the client
    2. if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.
    3. if the remote storage operation failed (remote storage is temporarily unavailable), the operation will be retried with Exponential Back-Off, until the original consumer fetch request timeout.

Remote Log Segment Metadata State transitions

Image Added

Leader broker copies the log segments to the remote storage and puts the remote log segment metadata with the state as “COPY_STARTED” and updates the state as “COPY_FINISHED” once the copy is successful. Leaders also remove the remote log segments based on the retention policy. Before the log segment is removed using RSM.deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata), it updates the remote log segment with the state as DELETE_STARTED and it updates with DELETE_FINISHED once it is successful.

When a partition is deleted, leader updates its state in RLMM with DELETE_MARKED. A task on each leader of “__remote_log_segment_metadata_topic” partitions consume the messages and checks for messages which have the state as “DELETE_MARKED” and schedules them to be deleted. It commits consumer offsets upto which these markers were handled so that whenever a leader switches to other brokers they can continue from where they were left. The controller considers the topic partition is deleted only when it determines that there are no log segments for that topic partition by using RLMM.

RemoteLogMetadataManager implemented with an internal topic

...