Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

On-premise Kafka deployments use for Kafka broker nodes hardware SKUs with multiple high capacity disks to maximize the i/o throughput and to store the data for the retention period. Equivalent SKUs with similar local storage options are either not available unavailable or they are very expensive in the cloud. There are more available options for SKUs with lesser local storage capacity as Kafka broker nodes have more available options and they are more suitable in the cloud.

...

In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. Local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. The new remote tier uses systems, such as HDFS or S3 to store the completed log segments. Two separate retention periods are defined corresponding to each of the tiers. With remote tier enabled, the retention period for the local tier can be significantly reduced from days to few hours. The retention period for remote tier can be much longer, days or even months. When a log segment is rolled on the local tier, it is copied to the remote tier along with the corresponding offset index. Applications that are latency Latency sensitive applications perform tail reads and are served from local tier leveraging the existing Kafka mechanism of efficiently using page cache to serve the data. Backfill and other applications recovering from a failure that needs data older than what is in the local tier are served from the remote tier.

...

Tiered storage does not replace ETL pipelines and jobs. Existing ETL pipelines continue to consume data from Kafka as is, albeit with data in Kafka having a much longer retention period.

It does not support compact topics.   

High-level design


Remote Log Manager (RLM) is a new component that copies the completed LogSegments and corresponding OffsetIndex to remote tier.keeps track of remote log segments 

  • RLM component will keep track of topic-partition and its segments. It will delegate the copy and read of these segments to pluggable storage manager implementation.
  • RLM has two modes:
    • RLM Leader - In this mode, RLM is the leader for topic-partition, checks for rolled over LogSegments and copies it along with OffsetIndex to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. Additionally, the RLM leader also serves the read requests for older data from the remote tier.
    • RLM Follower - In this mode, RLM keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. RLM follower does not serve reading old data from the remote tier.

...

System-Wide
  • remote.log.storage.enable = false (to support backward compatibility)
  • remote.log.storage.manager.class.name =  org.apache.kafka.rsm.hdfs.HDFSRemoteStorageManager
RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

  • remote.log.storage.*
Per Topic Configuration
  • remote.log.retention.minutes

  • remote.log.retention.bytes

Remote Storage Manager:

        RemoteStorageManager         `RemoteStorageManager` is an interface that allows to plugin different remote storage implementations to copy provide the lifecycle of remote log segments. The default implementation of the interface supports HDFS as remote storage. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration remote.storage.manager.classNo implementation will be provioded as part of Apache Kafka binaries. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.


Code Block
languagescala

/**
 * RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy andprovide
 * the lifecycle retrieveof theremote log segments.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
   * segments in the remote storage.
   *
   * @param tp
   * @return
   */
  @throws(classOf[IOException])
  def earliestLogOffset(tp: TopicPartition): Long

  /**
   * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
   *
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry]

  /**
   * CancelsList the unfinishedremote LogSegment copying of the given topic-partition.
   *
   * @param topicPartition
   */
  def cancelCopyingLogSegment(topicPartition: TopicPartition): Unit

  /**
   * List the remote log log segment files of the given topicPartition.
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition starting from the base offset minBaseOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minBaseOffset The minimum base offset for a segment to be returned.
   * @return List of remote segments starting from the base offset minBaseOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo]

  /**
   * Returns a List of RemoteLogIndexEntry for the given RemoteLogSegmentInfo. This is used by follower to store remote
   * log indexes locally.
   *
   * @param remoteLogSegment
   * @return
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * Deletes remote LogSegment file and indexes for the given remoteLogSegmentInfo
   *
   * @param remoteLogSegmentInfo
   * @return
   */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean

  /**
   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   *
   * @param topicPartition
   * @return
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the
   * earliest remote log segment if exists or -1 if there are no log segments in the remote storage.
   *
   * @param topicPartition
   * @param cleanUpTillMs
   * @return
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that is greater than or equals to the
   * startOffset,. It will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

  /**
   * Release any system resources used by this instance.
   */
  def close(): Unit
}



Proposed Changes

When `remote.log.storage.enable` is set as true and respective properties are configured 

When an RLM class an RLM class is configured and all the required configs are present, RLM will send a list of topic-partitions and invoke the RemoteLogManager.addTopicPartitions(). This function’s responsibility is to delegate these topic-partitions to RLM. RLM will monitor the log.dirs for these topic-partitions and copy the rolled over LogSegment to the configured remote storage. Once a LogSegment is copied over it should mark it as done.

...

Log retention will continue to work as it is today except for one case, where If a LogSegment is in the process of getting copied over and it doesn't have associated "copy-done" file, LogCleaner will skips these LogSegments until it has the marker to denote its copied over to remote and its safe to deletelike earlier, local log segments are deleted only when the respective log segments are copied over to remote storage successfully even though their local retention time/size is reached.

Consumer Fetch Requests

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager.readFromRemoteLog and returns the LogReadResult. If the fetch request is from a consumer, RLM will read the data from remote storage, and return the result to the consumer.

...