Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It does not support compact topics.   

High-level design

Image Removed

...

  • It will delegate copy and read of these segments to a pluggable storage manager(viz RemoteStorageManager) implementation.

RLM has two modes:

  • RLM Leader - In this mode, RLM is the leader for a topic-partition, checks for rolled over LogSegments (which have last message offset less than last stable offset of that topic partition) and copies them along with their remote offset/time indexes to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. These indexes are described in detail later. It also serves the fetch requests for older data from the remote tier. Local logs are not cleanedup till those segments are copied successfully to remtoe even though their retention time/size is reached.
  • RLM Follower - In this mode, RLM keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. RLM follower does not serve reading old data from the remote tier. Local logs are not cleanedup till their remote log indexes are copied locally from remote storage even though their retention time/size is reached.

Core Kafka changes

To satisfy the goal of keeping Kafka changes minimal when RLM is not configured, Kafka behavior remains unchanged for existing users.

  • Core Kafka starts RLM service if tiered storage is configured
  • When an offset index is not found, if RLM is configured, the read request is delegated to RLM to serve the data from the remote tier.

Serving Data from Remote Storage

For each topic partition that has RLM configured, RLM leader for a topic partition copies log segments which have last message offset less than last stable offset of that topic partition to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.

After successfully copied a segment file to remote storage, RLM will append a set of index entries to 3 local index files: remoteLogIndex, remoteOffsetIndex, remoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

(active segment)

{log.dirs}/{topic-partition}/0000002400013.index

{log.dirs}/{topic-partition}/0000002400013.timeindex

{log.dirs}/{topic-partition}/0000002400013.log

(inactive segments)

{log.dirs}/{topic-partition}/0000002000238.index

{log.dirs}/{topic-partition}/0000002000238.timeindex

{log.dirs}/{topic-partition}/0000002000238.log

{log.dirs}/{topic-partition}/0000001600100.index

{log.dirs}/{topic-partition}/0000001600100.timeindex

{log.dirs}/{topic-partition}/0000001600100.log

(active remote segment)

{log.dirs}/{topic-partition}/0000001000121.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000001000121.remoteTimeIndex

{log.dirs}/{topic-partition}/0000001000121.remoteLogIndex

(inactive remote segments)

{log.dirs}/{topic-partition}/0000000512002.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000000512002.remoteTimeIndex

{log.dirs}/{topic-partition}/0000000512002.remoteLogIndex

Public Interfaces

Compacted topics will not have remote storage support. 

Configs

System-Wide
  • remote.log.storage.enable = false (to support backward compatibility)
  • remote.log.storage.manager.class.name =  org.apache.kafka.rsm.hdfs.HDFSRemoteStorageManager
RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

  • remote.log.storage.*
Per Topic Configuration
  • remote.log.retention.minutes

  • remote.log.retention.bytes

Remote Storage Manager:

         `RemoteStorageManager` is an interface that allows to plugin different remote storage implementations to provide the lifecycle of remote log segments. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.


Code Block
languagescala

/**
 * RemoteStorageManager is an interface that allows to plugin different remote storage implementations to provide
 * the lifecycle of remote log segments.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
   * segments in the remote storage.
   *
   * @param tp
   * @return
   */
  @throws(classOf[IOException])
  def earliestLogOffset(tp: TopicPartition): Long

  /**
   * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
   *
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry]

  /**
   * List the remote log segment files of the given topicPartition.
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition starting from the base offset minBaseOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minBaseOffset The minimum base offset for a segment to be returned.
   * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo]

  /**
   * Returns a List of RemoteLogIndexEntry for the given RemoteLogSegmentInfo. This is used by follower to store remote
   * log indexes locally.
   *
   * @param remoteLogSegment
   * @return
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * Deletes remote LogSegment file and indexes for the given remoteLogSegmentInfo
   *
   * @param remoteLogSegmentInfo
   * @return
   */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean

  /**
   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   *
   * @param topicPartition
   * @return
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the
   * earliest remote log segment if exists or -1 if there are no log segments in the remote storage.
   *
   * @param topicPartition
   * @param cleanUpTillMs
   * @return
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that is greater than or equals to the
   * startOffset. It will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

  /**
   * Release any system resources used by this instance.
   */
  def close(): Unit
}



Proposed Changes

High-level design

Image Added


RemoteLogManager (RLM) is a new component that keeps track of remote log segments 

  • It will delegate copy and read of these segments to a pluggable storage manager(viz RemoteStorageManager) implementation.

RLM has two modes:

  • RLM Leader - In this mode, RLM is the leader for a topic-partition, checks for rolled over LogSegments (which have last message offset less than last stable offset of that topic partition) and copies them along with their remote offset/time indexes to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. These indexes are described in detail later. It also serves the fetch requests for older data from the remote tier. Local logs are not cleanedup till those segments are copied successfully to remtoe even though their retention time/size is reached.
  • RLM Follower - In this mode, RLM keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. RLM follower does not serve reading old data from the remote tier. Local logs are not cleanedup till their remote log indexes are copied locally from remote storage even though their retention time/size is reached.

Core Kafka changes

To satisfy the goal of keeping Kafka changes minimal when RLM is not configured, Kafka behavior remains unchanged for existing users.

  • Core Kafka starts RLM service if tiered storage is configured
  • When an offset index is not found, if RLM is configured, the read request is delegated to RLM to serve the data from the remote tier.

Serving Data from Remote Storage

For each topic partition that has RLM configured, RLM leader for a topic partition copies log segments which have last message offset less than last stable offset of that topic partition to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.

After successfully copied a segment file to remote storage, RLM will append a set of index entries to 3 local index files: remoteLogIndex, remoteOffsetIndex, remoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

(active segment)

{log.dirs}/{topic-partition}/0000002400013.index

{log.dirs}/{topic-partition}/0000002400013.timeindex

{log.dirs}/{topic-partition}/0000002400013.log


(inactive segments)

{log.dirs}/{topic-partition}/0000002000238.index

{log.dirs}/{topic-partition}/0000002000238.timeindex

{log.dirs}/{topic-partition}/0000002000238.log

{log.dirs}/{topic-partition}/0000001600100.index

{log.dirs}/{topic-partition}/0000001600100.timeindex

{log.dirs}/{topic-partition}/0000001600100.log


(active remote segment)

{log.dirs}/{topic-partition}/0000001000121.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000001000121.remoteTimeIndex

{log.dirs}/{topic-partition}/0000001000121.remoteLogIndex


(inactive remote segments)

{log.dirs}/{topic-partition}/0000000512002.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000000512002.remoteTimeIndex

{log.dirs}/{topic-partition}/0000000512002.remoteLogIndex


Each index entry of the remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remoteLogIndex entry:

magic: int16 (current magic value is 0)

length: int16 (length of this entry)

crc: int32 (checksum from firstOffset to the end of this entry)

firstOffset: int64 (the Kafka offset of the 1st record)

lastOffset: int64 (the Kafka offset of the last record)

firstTimestamp: int64

lastTimestamp: int64

dataLength: int32 (length of the remote data)

rdiLength: int16

rdi: byte[] (Remote data identifier)

todo: We may change this format to have magic and crc for a batch of entries instead of having them for each entry.

RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.

Depends on the implementation, RLM may append 1 or more entries to the remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.

The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.

Remoteoffsetindex file and remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remoteLogIndex file instead of a log segment file.

Image Added

Manage Remote Log Segments

The RLM of a follower broker will retrieve remote segments information from the remote storage. It periodically checks the remote storage to find out the new segments shipped by the leader.

When the follower discovers a new segment in remote storage, it will retrieve the index entries from remote storage and creates local remote log index file. The RemoteOffsetIndex file and RemoteTimestampIndex file are also created accordingly.

The leader may fail to ship segment data to remote storage on time. In such a situation, the follower has to keep its local segment files, even if the configured retention time is reached. The local segment files (and the corresponding index files) can only be deleted in the following 2 cases:

  1. the follower received the corresponding segment data info from remote storage and updated its index files
  2. the local files are already older than the configured remote retention time

After restarted or after assigned new topic-partitions, the RLM of a follower will retrieve remote log segment info from remote storage, and the fetcher threads will fetch local log segment data from the leader.

When a remote segment is deleted in the remote storage, the follower deletes the corresponding index files from its local disk. A follower does not delete remote index files based on remote retention time. It always waits for the leader to delete the remote segments in remote storage first.

Each index entry of the remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remoteLogIndex entry:

...

magic: int16 (current magic value is 0)

length: int16 (length of this entry)

crc: int32 (checksum from firstOffset to the end of this entry)

firstOffset: int64 (the Kafka offset of the 1st record)

lastOffset: int64 (the Kafka offset of the last record)

firstTimestamp: int64

lastTimestamp: int64

dataLength: int32 (length of the remote data)

rdiLength: int16

rdi: byte[] (Remote data identifier)

todo: We may change this format to have magic and crc for a batch of entries instead of having them for each entry.

RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.

Depends on the implementation, RLM may append 1 or more entries to the remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.

The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.

Remoteoffsetindex file and remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remoteLogIndex file instead of a log segment file.

Image Removed

Manage Remote Log Segments

The RLM of a follower broker will retrieve remote segments information from the remote storage. It periodically checks the remote storage to find out the new segments shipped by the leader.

When the follower discovers a new segment in remote storage, it will retrieve the index entries from remote storage and creates local remote log index file. The RemoteOffsetIndex file and RemoteTimestampIndex file are also created accordingly.

The leader may fail to ship segment data to remote storage on time. In such a situation, the follower has to keep its local segment files, even if the configured retention time is reached. The local segment files (and the corresponding index files) can only be deleted in the following 2 cases:

  1. the follower received the corresponding segment data info from remote storage and updated its index files
  2. the local files are already older than the configured remote retention time

After restarted or after assigned new topic-partitions, the RLM of a follower will retrieve remote log segment info from remote storage, and the fetcher threads will fetch local log segment data from the leader.

When a remote segment is deleted in the remote storage, the follower deletes the corresponding index files from its local disk. A follower does not delete remote index files based on remote retention time. It always waits for the leader to delete the remote segments in remote storage first.

Public Interfaces

Compacted topics will not have remote storage support. 

Configs

...

  • remote.log.storage.enable = false (to support backward compatibility)
  • remote.log.storage.manager.class.name =  org.apache.kafka.rsm.hdfs.HDFSRemoteStorageManager

...

(These configs are dependent on remote storage manager implementation)

  • remote.log.storage.*

...

  • remote.log.retention.minutes

  • remote.log.retention.bytes

Remote Storage Manager:

         `RemoteStorageManager` is an interface that allows to plugin different remote storage implementations to provide the lifecycle of remote log segments. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.

Code Block
languagescala

/**
 * RemoteStorageManager is an interface that allows to plugin different remote storage implementations to provide
 * the lifecycle of remote log segments.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
   * segments in the remote storage.
   *
   * @param tp
   * @return
   */
  @throws(classOf[IOException])
  def earliestLogOffset(tp: TopicPartition): Long

  /**
   * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
   *
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry]

  /**
   * List the remote log segment files of the given topicPartition.
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition starting from the base offset minBaseOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minBaseOffset The minimum base offset for a segment to be returned.
   * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo]

  /**
   * Returns a List of RemoteLogIndexEntry for the given RemoteLogSegmentInfo. This is used by follower to store remote
   * log indexes locally.
   *
   * @param remoteLogSegment
   * @return
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * Deletes remote LogSegment file and indexes for the given remoteLogSegmentInfo
   *
   * @param remoteLogSegmentInfo
   * @return
   */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean

  /**
   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   *
   * @param topicPartition
   * @return
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the
   * earliest remote log segment if exists or -1 if there are no log segments in the remote storage.
   *
   * @param topicPartition
   * @param cleanUpTillMs
   * @return
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that is greater than or equals to the
   * startOffset. It will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

  /**
   * Release any system resources used by this instance.
   */
  def close(): Unit
}


Proposed Changes

Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.

...