Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The total storage required on a cluster is proportional to the number of topics/partitions, the rate of messages, and most importantly the retention period. A Kafka broker typically has a large number of disks with the total storage capacity of 10s of TBs. The amount of data locally stored on a Kafka broker presents many operational challenges.


Kafka as a long-term storage service

Kafka has grown in adoption to become the entry point of all of data. It allows users to not only consume data in real-time but also gives flexibility to fetch older data based on retention policies. Given the simplicity of Kafka protocol and wide adoption of consumer API, allowing users to store and fetch data with longer retention help make Kafka one true source of data.

Currently Kafka is configured with a shorter retention period in days (typically 3 days) and data older than the retention period is copied using data pipelines to a more scalable external storage for long-term use, such as HDFS. This results in data consumers having to build different versions of applications to consume the data from different systems depending on the age of the data.

Kafka cluster storage is typically scaled by adding more broker nodes to the cluster. But this also adds needless memory and CPUs to the cluster making overall storage cost less efficient compared to storing the older data in an external storage. Larger cluster with more nodes also adds to complexity of deployment and increases the operational costs.

Kafka local storage and operational complexity

When a broker fails, the failed node is replaced by a new node. The new node must copy all the data that was on the failed broker from other replicas. Similarly, when a new Kafka node is added to scale the cluster storage, cluster rebalancing assigns partitions to the new node which also requires copying a lot of data. The time for recovery and rebalancing is proportional to the amount of data stored locally on a Kafka broker. In setups that have many Kafka clusters running 100s of brokers, a node failure is a common occurrence, with a lot of time spent in recovery making operations difficult and time-consuming.

Reducing the amount of data stored on each broker can reduce the recovery/rebalancing time. But it would also necessitate reducing the log retention period impacting the time available for application maintenance and failure recovery.

Kafka in cloud

On-premise Kafka deployments use for Kafka broker nodes hardware SKUs with multiple high capacity disks to maximize the i/o throughput and to store the data for retention period. Equivalent SKUs with similar local storage options are either not available or they are very expensive in the cloud. SKUs with lesser local storage capacity as Kafka broker nodes have more available options and are more suitable in the cloud.

Solution - Tiered storage for Kafka

...

In tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. Local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. The new remote tier uses systems, such as HDFS or S3 to store the completed log segments. Two separate retention periods are defined corresponding to each of the tiers. With remote tier enabled, the retention period for the local tier can be significantly reduced from days to few hours. The retention period for remote tier can be much longer, days or even months. When a log segment is rolled on the local tier, it is copied to the remote tier along with the corresponding offset index. Applications that are latency sensitive perform tail reads and are served from local tier leveraging the existing Kafka mechanism of efficiently using page cache to serve the data. Backfill and other applications recovering from a failure that need data older than what is in the local tier are served from the remote tier.

This solution allows scaling storage independent of memory and CPUs in a Kafka cluster enabling Kafka to be a long-term storage solution. This also reduces the amount of data stored locally on Kafka brokers and hence the amount of data that needs to be copied during recovery and rebalancing. Log segments that are available in the remote tier need not be restored on the broker or restored lazily and are served from the remote tier. With this, increasing the retention period no longer requires scaling the Kafka cluster storage and the addition of new nodes. At the same time, the overall data retention can still be much longer eliminating the need for separate data pipelines to copy the data from Kafka to external stores, as done currently in many deployments.

Goal

Extend Kafka's storage beyond the local storage available on the Kafka cluster by retaining the older data in an external store, such as HDFS or S3 with minimal impact on the internals of Kafka. Kafka behavior and operational complexity must not change for existing users that do not have tiered storage feature configured.


Non-Goals

Tiered storage does not replace ETL pipelines and jobs. Existing ETL pipelines continue to consume data from Kafka as is, albeit with data in Kafka having much longer retention period.

High-level design



Remote Log Manager (RLM) is a new component that copies the completed LogSegments and corresponding OffsetIndex to remote tier.

* An interface will be introduced using which different RLM implementations that perform integration with different types of remote storages can be made available. RLM implementation code can also be kept outside Kafka core if the community chooses so RLM component will keep tracks of topic-partition and its segments. It will delegate the copy and read of these segments to pluggable storage manager implementation.
* RLM has two modes:
* RLM Leader - In this mode, RLM that is the leader for topic-partition, checks for rolled over LogSegments and copies it along with OffsetIndex to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. Additionally, RLM leader also serves the read requests for older data from the remote tier.
* RLM Follower - In this mode, RLM keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. RLM follower does not serve reading old data from the remote tier.

...

Only the broker that is the Leader for topic-partitions is allowed to copy to the remote storage.

Note: Early proposal. To be finalized during implementation.

Code Block
languagescala
class RemoteLogManager extends Configurable {
     val RemoteStorageManager
     // configure
     def configure(Map<String, ?> configs)

     // Copies LogSegments yet to be copied to remote storage for the given set of TopicPartitions
     def addPartitions(topicPartitions: Set[TopicPartition]): boolean 

     //  Deletes LogSegments based on remote.log.retention.period or  remote.log.retention.bytes configuration
     def removePartitions(topicPartitions: Set[TopicPartition]): boolean

     // Marks partitions as copied
     def  markPartitionsAsCopied(topicPartitions: Set[TopicPartition])

     // Read topic partition data from remote
     def read(fetchMaxByes: Int, hardMaxBytesLimit:Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)]): LogReadResult

     // Stops all the threads and closes the instance.
     def shutdown(): Unit   
}


Remote Storage Manager:

        RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy the log segments. The default implementation of the interface supports HDFS as remote storage. Additional remote storage support, such as S3 can be added later by providing other implementations using the configuration remote.storage.manager.class.

Note: Early proposal. To be finalized during implementation.

Code Block
languagescala

Trait RemoteStorageManager extends Configurable {
    
     // Configure
     def configure(Map<String, ?> configs)

     // Copies LogSegments provided by the RLM
     def copyLogSegments(logSegments: Set[LogSegment]): boolean 

     // Deletes remote LogSegment files provided by the RLM
     def deleteLogSegments(logSegments: Set[LogSegment]): boolean

     // read topic partition data from remote
     def read(logSegment:LogSegment, maxBytes: Int): LogReadInfo

     // stops all the threads and closes the instance.
     def shutdown(): Unit   
}


Replica Manager

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager works today.

...

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager.readFromRemoteLog and returns the LogReadResult.


Follower Requests/Replication

If a local LogSegment copied into Remote Storage by a Leader Broker and updated its RemoteOffsetIndex, it's not necessary for Follower to copy this segment which is already present in Remote Storage. Instead a follower will sync RemoteOffsetIndex for a given Topic-Partition from the Leader broker.  If a Replica becomes a Leader, It can still locate and serve data from Remote storage.

Alternatives considered

Following alternatives were considered:

...