...
Remote Log Manager (RLM) is a new component that copies the completed LogSegments and corresponding OffsetIndex to remote tier.*
- RLM component will keep tracks of topic-partition and its segments. It will delegate the copy and read of these segments to pluggable storage manager implementation.
...
- RLM has two modes:
...
- RLM Leader - In this mode, RLM that is the leader for topic-partition, checks for rolled over LogSegments and copies it along with OffsetIndex to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. Additionally, RLM leader also serves the read requests for older data from the remote tier.
...
- RLM Follower - In this mode, RLM keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. RLM follower does not serve reading old data from the remote tier.
Core Kafka changes
To satisfy the goal of keeping Kafka changes minimal when RLM is not configured, Kafka behavior remains unchanged for existing users.
...
Serving Data from Remote Storage
Approach 1
For each topic partition that has RLM configured, RLM will ship all the LogSegments and the corresponding OffsetIndex to RemoteStorage. A new index file, **<code>RemoteLogSegmentIndex,</code></strong> is maintained locally on the Kafka broker per topic-partition like all the existing index files are stored today as shown below:
...
the log segment files that are older than a configurable time to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.
After successfully copied a segment file to remote storage, RLM will append a set of index entries to 3 local index files: remotelogindex, remoteoffsetindex, remotetimeindex. These index files are rotated by RLM at a configurable time interval (or a configurable size).
(active segment)
{log.dirs}/{topic-partition}/0000002400013.index
{log.dirs}/{topic-partition}/
...
0000002400013.
...
timeindex
{log.dirs}/{topic-partition}/0000002400013.log
(inactive segments)
{log.dirs}/{topic-partition}/0000002000238.index
{log.dirs}/{topic-partition}/0000002000238.timeindex
{log.dirs}/{topic-partition}/0000002000238.log
{log.dirs}/{topic-partition}/0000001600100.index
{log.dirs}/{topic-partition}/0000001600100.timeindex
{log.dirs}/{topic-partition}/0000001600100.log
(active remote segment)
{log.dirs}/{topic-partition}/0000001000121.remoteoffsetindex
{log.dirs}/{topic-partition}/0000001000121.remotetimeindex
{log.dirs}/{topic-partition}/0000001000121.remotelogindex
(inactive remote segments)
{log.dirs}/{topic-partition}/0000000512002.remoteoffsetindex
{log.dirs}/{topic-partition}/0000000512002.remotetimeindex
{log.dirs}/{topic-partition}/0000000512002.remotelogindex
Each index entry of the remotelogindex file contains the information of a sequence of records in the remote log segment file. The format of a remotelogindex entry:
magic: int16 (current magic value is 0)
length: int16 (length of this entry)
crc: int32 (checksum of this entry)
startOffset: int64 (the Kafka offset of the 1st record)
lastOffset: int64 (the Kafka offset of the last record)
firstTimestamp: int64
maxTimestamp: int64
dataLength: int32 (length of the data)
rdiLength: int16
rdi: byte[] (Remote data identifier)
RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.
Depends on the implementation, RLM may append 1 or more entries to the remotelogindex file for each remote segment file. More entries will provide finer grained indexing of the remote data with the cost of local disk space.
The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.
Remoteoffsetindex file and remotetimestampindex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they index the corresponding remotelogindex file instead of a log segment file
Code Block |
---|
FileName : 00000001000121.remoteindex
Contents:
SegmentStartOffset
1000121
1500024
2000011
…
2999999
FileName : 00000003000000.remoteindex
Contents:
SegmentStartOffset
3000000
3500024
4000011
… |
RLM maintains these RemoteLogSegmentIndexes per topic-partition in local files on the Kafka broker. These files are rolled on a periodic basis with starting index of first LogSegment in the file name. Note that the RemoteLogSegmentIndex can be constructed by listing all the log segments stored on the remote storage. Maintaining a local file is an optimization to avoid such listing operations that may be slow and expensive depending on the external store. RemoteLogSegmentIndex files are MMAP'ed files and will follow a similar binary search mechanism as OffsetIndex files to find a LogSegment to serve a read operation.
On `OutOfRangeOffsetException`, ReplicaManager delegates the read request to RLM which does the following:
1. RLM performs a binary search on the memory mapped `RemoteLogSegmentIndex` file to find the starting offset of a LogSegment that has the requested offset.
1. RLM uses starting offset to build the file names (or object names) of LogSegment and OffsetIndex.
1. RLM fetches the segment and offset files on demand and seeks into LogSegment for the requested offset and serves the data to the client.
Approach 2
As in approach 1, RLM stores LogSegment and OffsetIndex in remote storage. In addition, it also stores an additional copy of OffsetIndex files in the local storage to avoid reading the offsets from the remote storage. This approach will efficiently seek ahead of time instead of fetching the entire LogSegment file from the remote.
Public Interfaces
Compacted topics will not have remote storage support.
Configs
System-Wide |
|
Per Topic Configuration |
|
RemoteLogManager (RLM)
RemoteLogManager is a new interface added to the broker. It is responsible to copy the completed log segments to the remote storage and update RemoteOffsetIndex file. The default implementation of interface supports HDFS as remote storage. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration `Remote.log.manager.class`.
...
Note: Early proposal. To be finalized during implementation.
Code Block | ||
---|---|---|
| ||
Trait RemoteStorageManager extends Configurable {
// Configure
def configure(Map<String, ?> configs)
// Copies LogSegments provided by the RLM
def copyLogSegments(logSegments: Set[LogSegment]): boolean
// Deletes remote LogSegment files provided by the RLM
def deleteLogSegments(logSegments: Set[LogSegment]): boolean
// read topic partition data from remote
def read(logSegment:LogSegment, maxBytes: Int): LogReadInfo
// stops all the threads and closes the instance.
def shutdown(): Unit
} |
Replica Manager
If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.
...