Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


Remote Log Manager (RLM) is a new component that copies the completed LogSegments and corresponding OffsetIndex to remote tier.

  • RLM component will keep tracks of topic-partition and its segments. It will delegate the copy and read of these segments to pluggable storage manager implementation.

...

  • RLM has two modes:

      ...

        • RLM Leader - In this mode, RLM that is the leader for topic-partition, checks for rolled over LogSegments and copies it along with OffsetIndex to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. Additionally, RLM leader also serves the read requests for older data from the remote tier.

      ...

        • RLM Follower - In this mode, RLM keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. RLM follower does not serve reading old data from the remote tier.

      Core Kafka changes

      To satisfy the goal of keeping Kafka changes minimal when RLM is not configured, Kafka behavior remains unchanged for existing users.

      ...

      Serving Data from Remote Storage

       Approach 1

      For each topic partition that has RLM configured, RLM will ship all the LogSegments and the corresponding OffsetIndex to RemoteStorage. A new index file, **<code>RemoteLogSegmentIndex,</code></strong> is maintained locally on the Kafka broker per topic-partition like all the existing index files are stored today as shown below:   

       

      ...

      the log segment files that are older than a configurable time to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.

      After successfully copied a segment file to remote storage, RLM will append a set of index entries to 3 local index files: remotelogindex, remoteoffsetindex, remotetimeindex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

      (active segment)

      {log.dirs}/{topic-partition}/0000002400013.index

      {log.dirs}/{topic-partition}/

      ...

      0000002400013.

      ...

      timeindex

      {log.dirs}/{topic-partition}/0000002400013.log

      (inactive segments)

      {log.dirs}/{topic-partition}/0000002000238.index

      {log.dirs}/{topic-partition}/0000002000238.timeindex

      {log.dirs}/{topic-partition}/0000002000238.log

      {log.dirs}/{topic-partition}/0000001600100.index

      {log.dirs}/{topic-partition}/0000001600100.timeindex

      {log.dirs}/{topic-partition}/0000001600100.log

      (active remote segment)

      {log.dirs}/{topic-partition}/0000001000121.remoteoffsetindex

      {log.dirs}/{topic-partition}/0000001000121.remotetimeindex

      {log.dirs}/{topic-partition}/0000001000121.remotelogindex

      (inactive remote segments)

      {log.dirs}/{topic-partition}/0000000512002.remoteoffsetindex

      {log.dirs}/{topic-partition}/0000000512002.remotetimeindex

      {log.dirs}/{topic-partition}/0000000512002.remotelogindex

      Each index entry of the remotelogindex file contains the information of a sequence of records in the remote log segment file. The format of a remotelogindex entry:

      magic: int16 (current magic value is 0)

      length: int16 (length of this entry)

      crc: int32 (checksum of this entry)

      startOffset: int64 (the Kafka offset of the 1st record)

      lastOffset: int64 (the Kafka offset of the last record)

      firstTimestamp: int64

      maxTimestamp: int64

      dataLength: int32 (length of the data)

      rdiLength: int16

      rdi: byte[] (Remote data identifier)


      RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.

      Depends on the implementation, RLM may append 1 or more entries to the remotelogindex file for each remote segment file. More entries will provide finer grained indexing of the remote data with the cost of local disk space.

      The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.

      Remoteoffsetindex file and remotetimestampindex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they index the corresponding remotelogindex file instead of a log segment file

      Code Block
      FileName : 00000001000121.remoteindex 
      
      Contents:
      SegmentStartOffset
      1000121 
      1500024 
      2000011 
      …
      2999999
      
      FileName : 00000003000000.remoteindex 
      Contents:
      SegmentStartOffset
      3000000 
      3500024 
      4000011 
      …

      RLM maintains these RemoteLogSegmentIndexes per topic-partition in local files on the Kafka broker. These files are rolled on a periodic basis with starting index of first LogSegment in the file name. Note that the RemoteLogSegmentIndex can be constructed by listing all the log segments stored on the remote storage. Maintaining a local file is an optimization to avoid such listing operations that may be slow and expensive depending on the external store. RemoteLogSegmentIndex files are MMAP'ed files and will follow a similar binary search mechanism as OffsetIndex files to find a LogSegment to serve a read operation.

      On `OutOfRangeOffsetException`, ReplicaManager delegates the read request to RLM which does the following:

      1. RLM performs a binary search on the memory mapped `RemoteLogSegmentIndex` file to find the starting offset of a LogSegment that has the requested offset.
      1. RLM uses starting offset to build the file names (or object names) of LogSegment and OffsetIndex.
      1. RLM fetches the segment and offset files on demand and seeks into LogSegment for the requested offset and serves the data to the client.

      Approach 2

      As in approach 1, RLM stores LogSegment and OffsetIndex in remote storage. In addition, it also stores an additional copy of OffsetIndex files in the local storage to avoid reading the offsets from the remote storage. This approach will efficiently seek ahead of time instead of fetching the entire LogSegment file from the remote.


      Public Interfaces

      Compacted topics will not have remote storage support. 

      Configs

      System-Wide
      Per Topic Configuration
      • remote.retention.period

      • remote.retention.bytes


      RemoteLogManager (RLM)

      RemoteLogManager is a new interface added to the broker. It is responsible to copy the completed log segments to the remote storage and update RemoteOffsetIndex file. The default implementation of interface supports HDFS as remote storage. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration `Remote.log.manager.class`.

      ...

      Note: Early proposal. To be finalized during implementation.


      Code Block
      languagescala
      
      Trait RemoteStorageManager extends Configurable {
          
           // Configure
           def configure(Map<String, ?> configs)
      
           // Copies LogSegments provided by the RLM
           def copyLogSegments(logSegments: Set[LogSegment]): boolean 
      
           // Deletes remote LogSegment files provided by the RLM
           def deleteLogSegments(logSegments: Set[LogSegment]): boolean
      
           // read topic partition data from remote
           def read(logSegment:LogSegment, maxBytes: Int): LogReadInfo
      
           // stops all the threads and closes the instance.
           def shutdown(): Unit   
      }


      Replica Manager

      If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.

      ...