Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For follower partitions, it maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. This requires the cache to be based on disk storage like RocksDB to avoid a high memory footprint on a broker. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted.

Manage Remote Log

...

For each topic partition that has RLM configured, RLM leader for a topic partition copies log segments which have last message offset less than last stable offset of that topic partition to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.

After a segment file is copied to remote storage, RLM will append a set of index entries to 3 local index files: remoteLogIndex, remoteOffsetIndex, remoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

(active segment)

{log.dirs}/{topic-partition}/0000002400013.index

{log.dirs}/{topic-partition}/0000002400013.timeindex

{log.dirs}/{topic-partition}/0000002400013.log

(inactive segments)

{log.dirs}/{topic-partition}/0000002000238.index

{log.dirs}/{topic-partition}/0000002000238.timeindex

{log.dirs}/{topic-partition}/0000002000238.log

{log.dirs}/{topic-partition}/0000001600100.index

{log.dirs}/{topic-partition}/0000001600100.timeindex

{log.dirs}/{topic-partition}/0000001600100.log

( remote segment)

{log.dirs}/{topic-partition}/0000001000121.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000001000121.remoteTimeIndex

{log.dirs}/{topic-partition}/0000001000121.remoteLogIndex

( remote segments)

{log.dirs}/{topic-partition}/0000000512002.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000000512002.remoteTimeIndex

{log.dirs}/{topic-partition}/0000000512002.remoteLogIndex

Each index entry of the remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remoteLogIndex entry:

...

magic: int16 (current magic value is 0)

length: int16 (length of this entry)

crc: int32 (checksum from firstOffset to the end of this entry)

firstOffset: int64 (the Kafka offset of the 1st record)

lastOffset: int64 (the Kafka offset of the last record)

firstTimestamp: int64

lastTimestamp: int64

dataLength: int32 (length of the remote data)

rdiLength: int16

rdi: byte[] (Remote data identifier)

RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.

Depends on the implementation, RLM may append 1 or more entries to the remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.

The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.

Remoteoffsetindex file and remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remoteLogIndex file instead of a log segment file.

Image Removed

Manage Remote Log Segments

The leader may fail to ship segment data to remote storage on time. In such a situation, the follower has to keep its local segment files, even if the configured retention time is reached. The local segment files (and the corresponding index files) can only be deleted in the following 2 cases:

  1. the follower received the corresponding segment data info from a remote storage and updated its index files and
  2. the local files are already older than the configured remote retention time

This is explained in detail here.

Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the ReplicaFetcherManager works today.

If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of copying the segment, it will finish the copy before it relinquishes the copy for topic-partition. This might leave duplicated messages.

ReplicaManager.readLocalLog works as it does today. But only in case of OffsetOutOfRange of exception and RLM is configured we will delegate the read request to RLM which returns LogReadResult.

Code Block
languagescala
def readFromLocaLog(): Seq[(TopicPartition, LogReadResult)] = {
catch {
case e@ (_: OffsetOutOfRangeException) =>
    RemoteLogManager.read(fetchMaxBytes: Int,
                          hardMaxBytesLimit: Boolean, 
                          tp: TopicPartition, 
                          fetchInfo: PartitionData 
                          quota: ReplicaQuota)
}

Consumer Fetch Requests

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager.readFromRemoteLog and returns the LogReadResult. More details are explained in the RLM/RSM tasks section.

Follower Requests/Replication

For follower fetch, the leader only returns the data that is still in the leader's local storage. If a LogSegment copied into remote storage by a leader broker, the follower doesn't need to copy this segment which is already present in remote storage. Instead, a follower will retrieve the information of the segment from remote storage. If a Replica becomes a leader, It can still locate and serve data from remote storage.

Other APIs

DeleteRecords

There is no change in the semantics of this API. It deletes records until the given offset if possible. This is equivalent to updating logStartOffset of the partition log with the given offset if it is greater than the current log-start-offset and it is less than or equal to high-watermark. If needed, it will clean remote logs asynchronously after updating log-start-offset of the log.

ListOffsets

ListOffsets API gives the offset(s) for the given timestamp either by looking into the local log or remote log time indexes. 

If the target timestamp is
ListOffsetRequest.EARLIEST_TIMESTAMP (value as -2) returns logStartOffset of the log.
ListOffsetRequest.LATEST_TIMESTAMP(value as-1) returns log-stable-offset or log-end-offset based on the isolation level in the request.

This API is enhanced with supporting new target timestamp value as -3 which is called NEXT_LOCAL_TIMESTAMP. There will not be any new fields added in request and response schemes but there will be a version bump to indicate the version update. This request is about the offset that the followers should start fetching to replicate the local logs. All the records earlier to this offset can be considered as copied to the remote storage. This is used by follower replicas to avoid fetching records that are already copied to remote tier storage.

When a follower replica needs to fetch the earliest messages that are to be replicated then it sends a request with target timestamp as NEXT_LOCAL_TIMESTAMP. 

For timestamps >= 0, it returns the first message offset whose timestamp is >= to the given timestamp in the request. That means it checks in remote log time indexes first, after which local log time indexes are checked. 

JBOD related changes

Currently, JBOD is supported by altering log dirs in two ways.

  • Altering to a different dir on the local broker

    • This can be done by copying remote log metadata files to the respective new topic partition directories in ReplicaAlterLogDirsThread. This will be implemented in the future.

  • Altering to a dir on a remote broker

    • This is equivalent to reassigning partitions to a different broker, which is already supported in this KIP as part of how followers behave with respect to remote tiered storage.

...

Remote storage (e.g. S3 / HDFS) is likely to have higher I/O latency and lower availability than local storage.

When the remote storage becoming temporarily unavailable (up to several hours) or having high latency (up to minutes), Kafka should still be able to operate normally. All the Kafka operations (produce, consume local data, create/expand topics, etc.) that do not rely on remote storage should not be impacted. The consumers that try to consume the remote data should get reasonable errors, when remote storage is unavailable or the remote storage requests timeout.

To achieve this, we have to handle remote storage operations in dedicated threads pools, instead of Kafka I/O threads and fetcher threads.

1. Remote Log Manager (RLM) Thread Pool

RLM maintains a list of the topic-partitions it manages. The list is updated in Kafka I/O threads, when topic-partitions are added to / removed from RLM. Each topic-partition in the list is assigned a scheduled processing time. The RLM thread pool processes the topic-partitions that the "scheduled processing time" is less than or equal to the current time.

When a new topic-partition is assigned to the broker, the topic-partition is added to the list, with scheduled processing time = 0, which means the topic-partition has to be processed immediately, to retrieve information from remote storage.

After a topic-partition is successfully processed by the thread pool, it's scheduled processing time is set to ( now() + rlm_process_interval_ms ). rlm_process_interval_ms can be configured in broker config file.

If the process of a topic-partition is failed due to remote storage error, its scheduled processing time is set to ( now() + rlm_retry_interval_ms ). rlm_retry_interval_ms can be configured in broker config file.

When a topic-partition is unassigned from the broker, the topic-partition is not currently processed by the thread pool, the topic-partition is directly removed from the list; otherwise, the topic-partition is marked as "deleted", and will be removed after the current process is done.

Each thread in the thread pool processes one topic-partition at a time in the following steps:

Copy log segments to remote storage (leader)

Copy the log segment files that are

       - inactive and

       - the offset range is not covered by the segments on the remote storage and

      - those segments have the last offset < last-stable-offset of the partition.

If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest. It uses the below copy API from RSM

copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry]

Retrieve the latest remote storage information (both leader and follower)

Call RSM.listRemoteSegments() to get the list of remote segments

If any of the remote segments are not added to the local index yet, call RSM.getRemoteLogIndexEntries to retrieve the remote log index data, append to the local remoteLogIndex file, and build the local index files accordingly. Roll the local remoteLogIndex file, when needed. Update the "latest remote offset" of the topic-partition.

Handle expired remote segments (leader and follower)

For leader, it invokes RSM.cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long) to delete remote log segments and return the start offset of the earliest remote log segment.

For follower, it fetches the earliest offset by calling RSM.earliestLogOffset(tp: TopicPartition).

Both leader and follower cleansup the existing indexes till that offset and updates start offset with the received value.

2. Remote Storage Fetcher Thread Pool

When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar to the existing produce/fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

Each thread in the thread pool processes one remote fetch request at a time. The remote storage fetch thread will

1) find out the corresponding RDI from the remote log index

2) try to retrieve the data from remote storage

2.1) if success, RemoteFetchPurgatory will be notified to return the data to the client

2.2) if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.

...

Segments

The leader may fail to ship segment data to remote storage on time. In such a situation, the follower has to keep its local segment files, even if the configured retention time is reached. The local segment files (and the corresponding index files) can only be deleted in the following 2 cases:

  1. the follower received the corresponding segment data info from a remote storage and updated its index files and
  2. the local files are already older than the configured remote retention time

This is explained in detail here.

Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the ReplicaFetcherManager works today.

If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of copying the segment, it will finish the copy before it relinquishes the copy for topic-partition. This might leave duplicated messages.

ReplicaManager.readLocalLog works as it does today. But only in case of OffsetOutOfRange of exception and RLM is configured we will delegate the read request to RLM which returns LogReadResult.

Code Block
languagescala
def readFromLocaLog(): Seq[(TopicPartition, LogReadResult)] = {
catch {
case e@ (_: OffsetOutOfRangeException) =>
    RemoteLogManager.read(fetchMaxBytes: Int,
                          hardMaxBytesLimit: Boolean, 
                          tp: TopicPartition, 
                          fetchInfo: PartitionData 
                          quota: ReplicaQuota)
}

Consumer Fetch Requests

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager.readFromRemoteLog and returns the LogReadResult. More details are explained in the RLM/RSM tasks section.

Follower Requests/Replication

For follower fetch, the leader only returns the data that is still in the leader's local storage. If a LogSegment copied into remote storage by a leader broker, the follower doesn't need to copy this segment which is already present in remote storage. Instead, a follower will retrieve the information of the segment from remote storage. If a Replica becomes a leader, It can still locate and serve data from remote storage.

Other APIs

DeleteRecords

There is no change in the semantics of this API. It deletes records until the given offset if possible. This is equivalent to updating logStartOffset of the partition log with the given offset if it is greater than the current log-start-offset and it is less than or equal to high-watermark. If needed, it will clean remote logs asynchronously after updating log-start-offset of the log.

ListOffsets

ListOffsets API gives the offset(s) for the given timestamp either by looking into the local log or remote log time indexes. 

If the target timestamp is
ListOffsetRequest.EARLIEST_TIMESTAMP (value as -2) returns logStartOffset of the log.
ListOffsetRequest.LATEST_TIMESTAMP(value as-1) returns log-stable-offset or log-end-offset based on the isolation level in the request.

This API is enhanced with supporting new target timestamp value as -3 which is called NEXT_LOCAL_TIMESTAMP. There will not be any new fields added in request and response schemes but there will be a version bump to indicate the version update. This request is about the offset that the followers should start fetching to replicate the local logs. All the records earlier to this offset can be considered as copied to the remote storage. This is used by follower replicas to avoid fetching records that are already copied to remote tier storage.

When a follower replica needs to fetch the earliest messages that are to be replicated then it sends a request with target timestamp as NEXT_LOCAL_TIMESTAMP. 

For timestamps >= 0, it returns the first message offset whose timestamp is >= to the given timestamp in the request. That means it checks in remote log time indexes first, after which local log time indexes are checked. 

JBOD related changes

Currently, JBOD is supported by altering log dirs in two ways.

  • Altering to a different dir on the local broker

    • This can be done by copying remote log metadata files to the respective new topic partition directories in ReplicaAlterLogDirsThread. This will be implemented in the future.

  • Altering to a dir on a remote broker

    • This is equivalent to reassigning partitions to a different broker, which is already supported in this KIP as part of how followers behave with respect to remote tiered storage.

RLM/RSM tasks and thread pools
Anchor
rlm-rsm-tasks
rlm-rsm-tasks

Remote storage (e.g. S3 / HDFS) is likely to have higher I/O latency and lower availability than local storage.

When the remote storage becoming temporarily unavailable (up to several hours) or having high latency (up to minutes), Kafka should still be able to operate normally. All the Kafka operations (produce, consume local data, create/expand topics, etc.) that do not rely on remote storage should not be impacted. The consumers that try to consume the remote data should get reasonable errors, when remote storage is unavailable or the remote storage requests timeout.

To achieve this, we have to handle remote storage operations in dedicated threads pools, instead of Kafka I/O threads and fetcher threads.

1. Remote Log Manager (RLM) Thread Pool

RLM maintains a list of the topic-partitions it manages. The list is updated in Kafka I/O threads, when topic-partitions are added to / removed from RLM. Each topic-partition in the list is assigned a scheduled processing time. The RLM thread pool processes the topic-partitions that the "scheduled processing time" is less than or equal to the current time.

When a new topic-partition is assigned to the broker, the topic-partition is added to the list, with scheduled processing time = 0, which means the topic-partition has to be processed immediately, to retrieve information from remote storage.

After a topic-partition is successfully processed by the thread pool, it's scheduled processing time is set to ( now() + rlm_process_interval_ms ). rlm_process_interval_ms can be configured in broker config file.

If the process of a topic-partition is failed due to remote storage error, its scheduled processing time is set to ( now() + rlm_retry_interval_ms ). rlm_retry_interval_ms can be configured in broker config file.

When a topic-partition is unassigned from the broker, the topic-partition is not currently processed by the thread pool, the topic-partition is directly removed from the list; otherwise, the topic-partition is marked as "deleted", and will be removed after the current process is done.

Each thread in the thread pool processes one topic-partition at a time in the following steps:

Copy log segments to remote storage (leader)

Copy the log segment files that are

       - inactive and

       - the offset range is not covered by the segments on the remote storage and

      - those segments have the last offset < last-stable-offset of the partition.

If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest. It uses the below copy API from RSM

copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry]

Retrieve the latest remote storage information (both leader and follower)

Call RSM.listRemoteSegments() to get the list of remote segments

If any of the remote segments are not added to the local index yet, call RSM.getRemoteLogIndexEntries to retrieve the remote log index data, append to the local remoteLogIndex file, and build the local index files accordingly. Roll the local remoteLogIndex file, when needed. Update the "latest remote offset" of the topic-partition.

Handle expired remote segments (leader and follower)

For leader, it invokes RSM.cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long) to delete remote log segments and return the start offset of the earliest remote log segment.

For follower, it fetches the earliest offset by calling RSM.earliestLogOffset(tp: TopicPartition).

Both leader and follower cleansup the existing indexes till that offset and updates start offset with the received value.


2. Remote Storage Fetcher Thread Pool

When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar to the existing produce/fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".

Each thread in the thread pool processes one remote fetch request at a time. The remote storage fetch thread will

1) find out the corresponding RDI from the remote log index

2) try to retrieve the data from remote storage

2.1) if success, RemoteFetchPurgatory will be notified to return the data to the client

2.2) if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.

2.3) if the remote storage operation failed (remote storage is temporarily unavailable), the operation will be retried with Exponential Back-Off, until the original consumer fetch request timeout.

Remote Log Indexes
Anchor
rdi-format
rdi-format

For each topic partition that has RLM configured, RLM leader for a topic partition copies log segments which have last message offset less than last stable offset of that topic partition to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.

After a segment file is copied to remote storage, RLM will append a set of index entries to 3 local index files: remoteLogIndex, remoteOffsetIndex, remoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

(active segment)

{log.dirs}/{topic-partition}/0000002400013.index

{log.dirs}/{topic-partition}/0000002400013.timeindex

{log.dirs}/{topic-partition}/0000002400013.log


(inactive segments)

{log.dirs}/{topic-partition}/0000002000238.index

{log.dirs}/{topic-partition}/0000002000238.timeindex

{log.dirs}/{topic-partition}/0000002000238.log

{log.dirs}/{topic-partition}/0000001600100.index

{log.dirs}/{topic-partition}/0000001600100.timeindex

{log.dirs}/{topic-partition}/0000001600100.log


( remote segment)

{log.dirs}/{topic-partition}/0000001000121.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000001000121.remoteTimeIndex

{log.dirs}/{topic-partition}/0000001000121.remoteLogIndex


( remote segments)

{log.dirs}/{topic-partition}/0000000512002.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000000512002.remoteTimeIndex

{log.dirs}/{topic-partition}/0000000512002.remoteLogIndex


Each index entry of the remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remoteLogIndex entry:

magic: int16 (current magic value is 0)

length: int16 (length of this entry)

crc: int32 (checksum from firstOffset to the end of this entry)

firstOffset: int64 (the Kafka offset of the 1st record)

lastOffset: int64 (the Kafka offset of the last record)

firstTimestamp: int64

lastTimestamp: int64

dataLength: int32 (length of the remote data)

rdiLength: int16

rdi: byte[] (Remote data identifier)


RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.

Depends on the implementation, RLM may append 1 or more entries to the remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.

The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.

Remoteoffsetindex file and remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remoteLogIndex file instead of a log segment file.

Image Added

Alternatives considered

Following alternatives were considered:

...