Authors Sriharsha Chintalapani, Satish Duggana, Suresh Srinivas, Ying Zheng (alphabetical order by the last names)
Status
Current State: Discussion
Discussion Thread: Discuss Thread
JIRA:
Google doc version of this wiki is located here.
Motivation
Kafka is an important part of data infrastructure and is seeing significant adoption and growth. As the Kafka cluster size grows and more data is stored in Kafka for a longer duration, several issues related to scalability, efficiency, and operations become important to address.
Kafka stores the messages in append-only log segments on local disks on Kafka brokers. The retention period for the log is based on `log.retention` that can be set system-wide or per topic. Retention gives the guarantee to consumers that even if their application failed or was down for maintenance, it can come back within the retention period to read from where it left off without losing any data.
The total storage required on a cluster is proportional to the number of topics/partitions, the rate of messages, and most importantly the retention period. A Kafka broker typically has a large number of disks with the total storage capacity of 10s of TBs. The amount of data locally stored on a Kafka broker presents many operational challenges.
Kafka as a long-term storage service
Kafka has grown in adoption to become the entry point of all of the data. It allows users to not only consume data in real-time but also gives the flexibility to fetch older data based on retention policies. Given the simplicity of Kafka protocol and wide adoption of consumer API, allowing users to store and fetch data with longer retention help make Kafka one true source of data.
Currently, Kafka is configured with a shorter retention period in days (typically 3 days) and data older than the retention period is copied using data pipelines to a more scalable external storage for long-term use, such as HDFS. This results in data consumers having to build different versions of applications to consume the data from different systems depending on the age of the data.
Kafka cluster storage is typically scaled by adding more broker nodes to the cluster. But this also adds needless memory and CPUs to the cluster making overall storage cost less efficient compared to storing the older data in external storage. Larger cluster with more nodes also adds to the complexity of deployment and increases the operational costs.
Kafka local storage and operational complexity
When a broker fails, the failed node is replaced by a new node. The new node must copy all the data that was on the failed broker from other replicas. Similarly, when a new Kafka node is added to scale the cluster storage, cluster rebalancing assigns partitions to the new node which also requires copying a lot of data. The time for recovery and rebalancing is proportional to the amount of data stored locally on a Kafka broker. In setups that have many Kafka clusters running 100s of brokers, a node failure is a common occurrence, with a lot of time spent in recovery making operations difficult and time-consuming.
Reducing the amount of data stored on each broker can reduce the recovery/rebalancing time. But it would also necessitate reducing the log retention period impacting the time available for application maintenance and failure recovery.
Kafka in cloud
On-premise Kafka deployments use hardware SKUs with multiple high capacity disks to maximize the i/o throughput and to store the data for the retention period. Equivalent SKUs with similar local storage options are either unavailable or they are very expensive in the cloud. There are more available options for SKUs with lesser local storage capacity as Kafka broker nodes and they are more suitable in the cloud.
Solution - Tiered storage for Kafka
Kafka data is mostly consumed in a streaming fashion using tail reads. Tail reads leverage OS's page cache to serve the data instead of disk reads. Older data is typically read from the disk for backfill or failure recovery purposes and is infrequent.
In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. Local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. The new remote tier uses systems, such as HDFS or S3 to store the completed log segments. Two separate retention periods are defined corresponding to each of the tiers. With remote tier enabled, the retention period for the local tier can be significantly reduced from days to few hours. The retention period for remote tier can be much longer, days or even months. When a log segment is rolled on the local tier, it is copied to the remote tier along with the corresponding offset index. Latency sensitive applications perform tail reads and are served from local tier leveraging the existing Kafka mechanism of efficiently using page cache to serve the data. Backfill and other applications recovering from a failure that needs data older than what is in the local tier are served from the remote tier.
This solution allows scaling storage independent of memory and CPUs in a Kafka cluster enabling Kafka to be a long-term storage solution. This also reduces the amount of data stored locally on Kafka brokers and hence the amount of data that needs to be copied during recovery and rebalancing. Log segments that are available in the remote tier need not be restored on the broker or restored lazily and are served from the remote tier. With this, increasing the retention period no longer requires scaling the Kafka cluster storage and the addition of new nodes. At the same time, the overall data retention can still be much longer eliminating the need for separate data pipelines to copy the data from Kafka to external stores, as done currently in many deployments.
Goals
Extend Kafka's storage beyond the local storage available on the Kafka cluster by retaining the older data in an external store, such as HDFS or S3 with minimal impact on the internals of Kafka. Kafka behavior and operational complexity must not change for existing users that do not have tiered storage feature configured.
Non-Goals
Tiered storage does not replace ETL pipelines and jobs. Existing ETL pipelines continue to consume data from Kafka as is, albeit with data in Kafka having a much longer retention period.
It does not support compact topics.
Public Interfaces
Compacted topics will not have remote storage support.
Configs
System-Wide | remote.log.storage.enable - Whether to enable remote log storage or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility. remote.log.storage.manager.class.name - This is mandatory if the remote.log.storage.enable is set as true. remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt metadata manager backed by an internal topic. |
RemoteStorageManager | (These configs are dependent on remote storage manager implementation) remote.log.storage.* |
RemoteLogMetadataManager | (These configs are dependent on remote log metadata manager implementation) remote.log.metadata.* |
Thread pools | remote.log.manager.thread.pool.size remote.log.manager.task.interval.ms remote.log.reader.threads remote.log.reader.max.pending.tasks |
Per Topic Configuration | remote.log.retention.minutes remote.log.retention.bytes |
Remote Storage Manager
`RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.
/** * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations. * * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a * strongly consistent manner. * * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}. * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already * stored in a consistent store. * * All these APIs are still experimental. */ @InterfaceStability.Unstable public interface RemoteStorageManager extends Configurable, Closeable { /** * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual * information about this copy operation. This can include path to the object in the store etc. * * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it * retries to invoke this method for the same log segment data. * * @param remoteLogSegmentId * @param logSegmentData * @return * @throws IOException */ RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) throws RemoteStorageException; /** * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log * segment data file/object. * * @param remoteLogSegmentMetadata * @param startPosition * @param endPosition * @return * @throws IOException */ InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Long startPosition, Long endPosition) throws RemoteStorageException; /** * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata * @return * @throws IOException */ InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata * @return * @throws IOException */ InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * * @param remoteLogSegmentMetadata * @return * @throws RemoteStorageException */ default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { throw new UnsupportedOperationException(); } /** * * @param remoteLogSegmentMetadata * @return * @throws RemoteStorageException */ default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { throw new UnsupportedOperationException(); } /** * Deletes the remote log segment for the given remoteLogSegmentMetadata. Deletion is considered as successful if * this call returns successfully without any exceptions. It will throw {@link RemoteStorageException} if there are * any errors in deleting the file. * * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users * can do the cleanup later. * * @param remoteLogSegmentMetadata * @throws IOException */ void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; } /** * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}. */ public class RemoteLogSegmentId { private TopicPartition topicPartition; private UUID id; public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) { this.topicPartition = requireNonNull(topicPartition); this.id = requireNonNull(id); } public TopicPartition topicPartition() { return topicPartition; } public UUID id() { return id; } ... } public class LogSegmentData { private final File logSegment; private final File offsetIndex; private final File timeIndex; private final File txnIndex; private final File producerIdSnapshotIndex; private final File leaderEpochIndex; public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex, File leaderEpochIndex) { this.logSegment = logSegment; this.offsetIndex = offsetIndex; this.timeIndex = timeIndex; this.txnIndex = txnIndex; this.producerIdSnapshotIndex = producerIdSnapshotIndex; this.leaderEpochIndex = leaderEpochIndex; } ... }
RemoteLogMetadataManager
`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.
/** * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics. * * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are * passed which can be used by this instance if needed. These props can be used if there is a single storage used for * different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org. * * todo-tier cleanup the abstractions in this interface. */ @InterfaceStability.Unstable public interface RemoteLogMetadataManager extends Configurable, Closeable { /** * */ String BROKER_ID = "broker.id"; /** * */ String CLUSTER_ID = "cluster.id"; /** * Stores RemoteLogSegmentMetadata with the given RemoteLogSegmentId. * * @param remoteLogSegmentMetadata * @throws IOException */ void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException; /** * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset. * * @param topicPartition * @param offset * @return * @throws IOException */ RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offset) throws IOException; /** * Fetches RemoteLogSegmentMetadata for the given RemoteLogSegmentId. * * @param remoteLogSegmentId * @return * @throws IOException */ RemoteLogSegmentMetadata getRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException; /** * Earliest log offset if exists for the given topic partition in the remote storage. Return {@link Optional#empty()} * if there are no segments in the remote storage. * * @param tp * @return */ Optional<Long> earliestLogOffset(TopicPartition tp) throws IOException; /** * * @param tp * @return * @throws IOException */ Optional<Long> highestLogOffset(TopicPartition tp) throws IOException; /** * Deletes the log segment metadata for the given remoteLogSegmentId. * * @param remoteLogSegmentId * @throws IOException */ void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException; /** * List the remote log segment files of the given topicPartition. * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition. * * @return List of remote segments, sorted by baseOffset in ascending order. */ default List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) { return listRemoteLogSegments(topicPartition, 0); } /** * @param topicPartition * @param minOffset * @return List of remote segments, sorted by baseOffset in ascending order. */ List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset); /** * This method is invoked only when there are changes in leadership of the topic partitions that this broker is * responsible for. * * @param leaderPartitions partitions that have become leaders on this broker. * @param followerPartitions partitions that have become followers on this broker. */ void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions); /** * This method is invoked only when the given topic partitions are stopped on this broker. This can happen when a * partition is emigrated to other broker or a partition is deleted. * * @param partitions */ void onStopPartitions(Set<TopicPartition> partitions); /** * Callback to receive once server is started so that this class can run tasks which should be run only when the * server is started. */ void onServerStarted(final String serverEndpoint); } /** * Metadata about the log segment stored in remote tier storage. */ public class RemoteLogSegmentMetadata implements Serializable { private static final long serialVersionUID = 1L; /** * Universally unique remote log segment id. */ private final RemoteLogSegmentId remoteLogSegmentId; /** * Start offset of this segment. */ private final long startOffset; /** * End offset of this segment. */ private final long endOffset; /** * Leader epoch of the broker. */ private final int leaderEpoch; /** * Maximum timestamp in the segment */ private final long maxTimestamp; /** * Epoch time at which the remote log segment is copied to the remote tier storage. */ private long createdTimestamp; /** * Size of the segment in bytes. */ private long segmentSizeInBytes; /** * It indicates that this is marked for deletion. */ private boolean markedForDeletion; /** * Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} for * the given remoteLogSegmentId */ private final byte[] remoteLogSegmentContext; /** * @param remoteLogSegmentId Universally unique remote log segment id. * @param startOffset Start offset of this segment. * @param endOffset End offset of this segment. * @param maxTimestamp maximum timestamp in this segment * @param leaderEpoch Leader epoch of the broker. * @param createdTimestamp Epoch time at which the remote log segment is copied to the remote tier storage. * @param markedForDeletion The respective segment of remoteLogSegmentId is marked fro deletion. * @param remoteLogSegmentContext Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} * @param segmentSizeInBytes size of this segment in bytes. */ public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset, long maxTimestamp, int leaderEpoch, long createdTimestamp, boolean markedForDeletion, byte[] remoteLogSegmentContext, long segmentSizeInBytes) { this.remoteLogSegmentId = remoteLogSegmentId; this.startOffset = startOffset; this.endOffset = endOffset; this.leaderEpoch = leaderEpoch; this.maxTimestamp = maxTimestamp; this.createdTimestamp = createdTimestamp; this.markedForDeletion = markedForDeletion; this.remoteLogSegmentContext = remoteLogSegmentContext; this.segmentSizeInBytes = segmentSizeInBytes; } ... }
Proposed Changes
High-level design
The earlier approach consists of pulling the remote log segment metadata from remote log storage APIs as mentioned in the earlier RemoteStorageManager_Old section. This approach worked fine for storages like HDFS. One of the problems of relying on the remote storage to maintain metadata is that tiered-storage needs to be strongly consistent, with an impact not only on the metadata itself (e.g. LIST in S3) but also on the segment data (e.g. GET after a DELETE in S3). Additionally to consistency and availability, the cost (and to a lesser extent performance) of maintaining metadata in remote storage needs to be factored in. In case of S3, frequent LIST APIs incur huge costs.
So, remote storage is separated from the remote log metadata store and introduced RemoteStorageManager and RemoteLogMetadataManager respectively. You can see the discussion details in the doc located here.
RemoteLogManager (RLM) is a new component which
- receives callback events for leadership changes and stop/delete events of topic partitions on a broker.
- delegates copy, read, and delete of topic partition segments to a pluggable storage manager(viz RemoteStorageManager) implementation and maintains respective remote log segment metadata through RemoteLogMetadataManager.
RLM creates tasks for each leader or follower topic partition, which are explained in detail here.
- RLM Leader Task
- It checks for rolled over LogSegments (which have the last message offset less than last stable offset of that topic partition) and copies them along with their offset/time/transaction indexes and leader epoch cache to the remote tier. It also serves the fetch requests for older data from the remote tier. Local logs are not cleaned up till those segments are copied successfully to remote even though their retention time/size is reached.
[We proposed an approach to creating a RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. These indexes are described in more detail here. This allows having a larger index interval of remote log segments instead of a large number of small index files. It also supports encrypted segments by encrypting individual record batch and build the respective indexes. We may want to explore this approach by enhancing RemoteStorageManager in later versions ]
- RLM Follower Task
- It keeps track of the segments and index files on remote tier by looking into RemoteLogMetdataManager. RLM follower can also serve reading old data from the remote tier.
RLM maintains a bounded cache(possibly LRU) of the index files of remote log segments to avoid multiple index fetches from the remote storage. These indexes can be used in the same way as local segment indexes are used.
Local and Remote log offset constraints
Below are the leader topic partition's log offsets
Lx = Local log start offset Lz = Local log end offset Ly = Last stable offset(LSO)
Ry = Remote log end offset Rx = Remote log start offset
Lz >= Ly
>= L
x and Ly >= Ry
>= Rx
Manage Remote Log Segments
The leader may fail to ship segment data to remote storage on time. In such a situation, the follower has to keep its local segment files, even if the configured retention time is reached. The local segment files (and the corresponding index files) can only be deleted in the following 2 cases:
- the follower received the corresponding segment data info from a remote storage and updated its index files and
- the local files are already older than the configured remote retention time
Replica Manager
If RLM is configured, ReplicaManager will call RLM to assign or remove topic-partitions.
If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of copying the segment, it will finish the copy before it relinquishes the copy for topic-partition. This might leave duplicated segments but these will be cleanedup when these segments are ready for deletion based on remote retention configs.
Follower Requests/Replication
For follower fetch, the leader only returns the data that is still in the leader's local storage. LogSegments that exist only on remote storage are not replicated to followers as it is already present in remote storage. We will discuss different cases of fetch offsets.
Followers fetch offsets and truncate their local logs if needed with the current mechanism based on leader's local-logStartOffset.
Leaders will send a new error code `OFFSET_MOVED_TO_TIERED_STORAGE` to followers if the fetch offset is not available in the local log. In this case, the follower needs to get auxiliary state of the remote log segments, which contains leader epochs and producer state. This can be done in two ways.
- introduce a new protocol (or API) to fetch this state from the leader partition.
- fetch this state from the remote storage.
Latter is preferred here as remote storage can have this state and it is simpler without introducing a new protocol with the leader.
This involves two steps in getting the required state of the respective log segment for the requested fetch offset.
- it should fetch the respective remote log segment metadata and
- it should fetch respective state like leader epochs from remote storage for the respective remote log segment metadata.
So, we need to add respective ReplicaStates for both which can be called `FetchingRemoteLogMetadata` and `FetchingRemoteLogAuxiliaryState`. Fetcher thread also processes both of these states in every run. FetchingRemoteLogMetadata state checks whether RLMM on the follower was able to cacthup to receive the remote log segment metadata of the desired fetch offset. If it has a value then the state is moved to FetchingRemoteLogAuxiliaryState by submitting a task to get the state from RemoteStorageManager using RLM thread pool. If the FetchingRemoteLogAuxiliaryState already receives the state then the state is moved to Fetching with the fetch offset as the remote log segment's endOffset+1.
Transactional support
RemoteLogManager copies transaction index and producer-id-snapshot along with the respective log segment earlier to last-stable-offset. This is used by the followers to return aborted transactions in fetch requests with isolation level as READ_COMMITTED.
Consumer Fetch Requests
For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager. More details are explained in the RLM/RSM tasks section.
Fetch from follower
There are no changes required for this to work in the case of tiered storage.
Other APIs
DeleteRecords
There is no change in the semantics of this API. It deletes records until the given offset if possible. This is equivalent to updating logStartOffset of the partition log with the given offset if it is greater than the current log-start-offset and it is less than or equal to high-watermark. If needed, it will clean remote logs asynchronously after updating the log-start-offset of the log.
ListOffsets
ListOffsets API gives the offset(s) for the given timestamp either by looking into the local log or remote log time indexes.
If the target timestamp is
ListOffsetRequest.EARLIEST_TIMESTAMP (value as -2) returns logStartOffset of the log.
ListOffsetRequest.LATEST_TIMESTAMP(value as-1) returns last-stable-offset or log-end-offset based on the isolation level in the request.
This API is enhanced with supporting new target timestamp value as -3 which is called NEXT_LOCAL_TIMESTAMP. There will not be any new fields added in request and response schemes but there will be a version bump to indicate the version update. This request is about the offset that the followers should start fetching to replicate the local logs. All the records earlier to this offset can be considered as copied to the remote storage. This is used by follower replicas to avoid fetching records that are already copied to remote tier storage.
When a follower replica needs to fetch the earliest messages that are to be replicated then it sends a request with target timestamp as NEXT_LOCAL_TIMESTAMP.
For timestamps >= 0, it returns the first message offset whose timestamp is >= to the given timestamp in the request. That means it checks in remote log time indexes first, after which local log time indexes are checked.
LeaderAndIsr
This is received by RLM to register for new leaders so that the data can be copied to the remote storage. RLMM will also register the respective metadata partitions for the leader/follower partitions if they are not yet subscribed.
Stopreplica
RLM receives a callback and unassigns the partition for leader/follower task, If the delete option is enabled then remote log segment metadata is updated in RLMM by enabling the delete marker. RLMM will eventually delete these segments by using RemoteStorageManager. As the segments are deleted asynchronously, creation of a topic with the same name later may cause issues because of the existing metadata about the earlier generation of the topic. It was discussed in the community earlier for adding UUID to represent a topic along with the name as part of KIP-516. That enhancement will be useful to address the issue mentioned here.
OffsetForLeaderEpoch
Look into leader epoch checkpoint cache. This is stored in tiered storage and it may be fetched by followers from tiered storage as part of the fetch protocol.
LogStartOffset
LogStartOffset of a topic can be either in local or in remote storage. This is already maintained in `Log` class while loading the logs and it can also be fetched from RemoteLogMetadataManager.
JBOD related changes
Currently, JBOD is supported by altering log dirs in two ways.
Altering to a different dir on the local broker
This is not supported in this KIP but we may plan this in future releases.
Altering to a dir on a remote broker
This is equivalent to reassigning partitions to a different broker, which is already supported in this KIP as part of how followers behave with respect to remote tiered storage.
There are no changes with other protocol APIs because of tiered storage.
RLM/RSM tasks and thread pools
Remote storage (e.g. HDFS/S3/GCP) is likely to have higher I/O latency and lower availability than local storage.
When the remote storage becoming temporarily unavailable (up to several hours) or having high latency (up to minutes), Kafka should still be able to operate normally. All the Kafka operations (produce, consume local data, create/expand topics, etc.) that do not rely on remote storage should not be impacted. The consumers that try to consume the remote data should get reasonable errors, when remote storage is unavailable or the remote storage requests timeout.
To achieve this, we have to handle remote storage operations in dedicated threads pools, instead of Kafka I/O threads and fetcher threads.
1. Remote Log Manager (RLM) Thread Pool
RLM maintains a list of the topic-partitions it manages. The list is updated in Kafka I/O threads, when topic-partitions are added to / removed from RLM. Each topic-partition in the list is assigned a scheduled processing time. The RLM thread pool processes the topic-partitions that the "scheduled processing time" is less than or equal to the current time.
When a new topic-partition is assigned to the broker, the topic-partition is added to the list, with scheduled processing time = 0, which means the topic-partition has to be processed immediately, to retrieve information from remote storage.
After a topic-partition is successfully processed by the thread pool, it's scheduled processing time is set to ( now() + rlm_process_interval_ms ). rlm_process_interval_ms can be configured in broker config file.
If the process of a topic-partition is failed due to remote storage error, its scheduled processing time is set to ( now() + rlm_retry_interval_ms ). rlm_retry_interval_ms can be configured in broker config file.
When a topic-partition is unassigned from the broker, the topic-partition is not currently processed by the thread pool, the topic-partition is directly removed from the list; otherwise, the topic-partition is marked as "deleted", and will be removed after the current process is done.
Each thread in the thread pool processes one topic-partition at a time in the following steps:
Copy log segments to remote storage (leader)
Copy the log segment files that are
- inactive and
- the offset range is not covered completely by the segments on the remote storage and
- those segments have the last offset < last-stable-offset of the partition.
If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest. It generates a universally unique RemoteLogSegmentId for each segment, it calls RLMM.putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) and it invokes copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) on RSM. If it is successful then it calls RLMM.putRemoteLogSegmentData with the updated RemoteLogSegmentMetadata instance else it removes the entry. Any dangling entries will be removed while removing expired log segments based on remote retention.
Handle expired remote segments (leader and follower)
RLM leader computes the log segments to be deleted based on the remote retention config. It updates the earliest offset for the given topic partition in RLMM. It gets all the remote log segment ids and removes them from remote storage using RemoteStorageManager. It also removes respective metadata using RemoteLogMetadataManager. If there are any failures in removing remote log segments then those are stored in a specific topic (default as __remote_segments_to_be_deleted) and user can consume the events(which contain remote-log-segment-id) from that topic and clean them up from remote storage. This can be improved upon in later versions.
RLM follower fetches the earliest offset by calling RLMM.earliestLogOffset(tp: TopicPartition).Both leader and follower cleanup the existing indexes till that offset and updates start offset with the received value.
2. Remote Storage Fetcher Thread Pool
When handling consumer fetch request, if the required offset is in remote storage, the request is added into "RemoteFetchPurgatory", to handle timeout. RemoteFetchPurgatory is an instance of kafka.server.DelayedOperationPurgatory, and is similar to the existing produce/fetch purgatories. At the same time, the request is put into the task queue of "remote storage fetcher thread pool".
Each thread in the thread pool processes one remote fetch request at a time. The remote storage fetch thread will
- find out the corresponding RemoteLogSegmentId from RLMM and startPosition and endPosition from the offset index.
- try to build Records instance data fetched from RSM.fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Long startPosition, Optional<Long> endPosition)
- if success, RemoteFetchPurgatory will be notified to return the data to the client
- if the remote segment file is already deleted, RemoteFetchPurgatory will be notified to return an error to the client.
- if the remote storage operation failed (remote storage is temporarily unavailable), the operation will be retried with Exponential Back-Off, until the original consumer fetch request timeout.
RemoteLogMetadataManager implemented with an internal topic
Metadata of remote log segments are stored in an internal topic called `__remote_log_metadata`. This topic can be created with default partitions count as 50. Users can configure the topic name, partitions count and replication factor etc.
In this design, RemoteLogMetadataManager(RLMM) is responsible for storing and fetching remote log metadata. It provides
- Storing remote log metadata for a partition based on offsets
- Fetching remote log segment metadata for an offset
- Register a topic partition to build cache for remote log metadata by reading from remote log metadata topic
RemoteLogMetadataManager(RLMM) mainly has the below components
- Cache
- Producer
- Consumer
Remote log metadata topic partition for a given user topic-partition is:
user-topic-partition.toString().hashCode() % no_of_remote_log_metadata_topic_partitions
RLMM registers the topic partitions that the broker is either a leader or a follower.
For leader replicas, RemoteLogManager(RLM) copies the log segment and indexes to the remote storage with the given RemoteLogsegmentId (RemoteStorageManager#copyLogSegment API). After this, RLM calls RLMM to store remote log metadata. This is stored in the remote log metadata topic and updates the cache.
For follower replicas, it maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. This requires the cache to be based on disk storage like RocksDB to avoid a high memory footprint on a broker. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted.
[We may add more details later about how the resultant state for each topic partition is computed ]
Truncation of remote segments under unclean leader election
It may be possible that log segments to truncate are in tiered storage when unclean leader election is enabled for a partition. It is possible that segments offloaded to the tiered storage (which were eligible as their end offset < LSO) need to be truncated when leadership of a topic-partition moved to a previously lagging follower. The following diagram illustrates a basic scenario that shows a situation where a remote segment would require truncation based on to exhibit the same replica lineage as local segments.
todo: We will update this digram with other scenarios in handling log truncation.
Local log segments are truncated in such a situation. We may not want to fetch and update the remote segment which may add a lot of complexity in handling the followers which started reading that segment. One way to address this is to update the respective remote log segment metadata with truncation offsets.
Performance Test Results
We have tested the performance of the initial implementation of this proposal.
The cluster configuration:
- 5 brokers
- 20 CPU cores, 256GB RAM (each broker)
- 2TB * 22 hard disks in RAID0 (each broker)
- Hardware RAID card with NV-memory write cache
- 20Gbps network
- snappy compression
- 6300 topic-partitions with 3 replicas
- remote storage uses HDFS
Each test case is tested under 2 types of workload (acks=all and acks=1)
Workload-1 (at-least-once, acks=all) | Workload-2 (acks=1) | |
---|---|---|
Producers | 10 producers 30MB / sec / broker (leader) ~62K messages / sec / broker (leader) | 10 producers 55MB / sec / broker (leader) ~120K messages / sec / broker (leader) |
In-sync Consumers | 10 consumers 120MB / sec / broker ~250K messages / sec / broker | 10 consumers 220MB / sec / broker ~480K messages / sec / broker |
Test case 1 (Normal case):
Normal traffic as described above.
with tiered storage | without tiered storage | ||
---|---|---|---|
Workload-1 (acks=all, low traffic) | Avg P99 produce latency | 25ms | 21ms |
Avg P95 produce latency | 14ms | 13ms | |
Workload-2 (acks=1, high traffic) | Avg P99 produce latency | 9ms | 9ms |
Avg P95 produce latency | 4ms | 4ms |
We can see there is a little overhead when tiered storage is turned on. This is expected, as the brokers have to ship segments to remote storage, and sync the remote segment metadata between brokers. With at-least-once (acks=all) produce, the produce latency is slightly increased when tiered storage is turned on. With acks=1 produce, the produce latency is almost not changed when tiered storage is turned on.
Test case 2 (out-of-sync consumers catching up):
In addition to the normal traffic, 9 out-of-sync consumers consume 180MB/s per broker (or 900MB/s in total) old data.
With tiered storage, the old data is read from HDFS. Without tiered storage, the old data is read from local disk.
with tiered storage | without tiered storage | ||
---|---|---|---|
Workload-1 (acks=all, low traffic) | Avg P99 produce latency | 42ms | 60ms |
Avg P95 produce latency | 18ms | 30ms | |
Workload-2 (acks=1, high traffic) | Avg P99 produce latency | 10ms | 10ms |
Avg P95 produce latency | 5ms | 4ms |
Consuming old data has a significant performance impact to acks=all producers. Without tiered storage, the P99 produce latency is almost tripled. With tiered storage, the performance impact is relatively lower, because remote storage reading does not compete the local hard disk bandwidth with produce requests.
Consuming old data has little impact to acks=1 producers.
Test case 3 (rebuild broker):
Under the normal traffic, stop a broker, remove all the local data, and rebuild it without replication throttling. This case simulates replacing a broken broker server.
with tiered storage | without tiered storage | ||
---|---|---|---|
Workload-1 (acks=all, 12TB data per broker) | Max avg P99 produce latency | 56ms | 490ms |
Max avg P95 produce latency | 23ms | 290ms | |
Duration | 2min | 230ms | |
Workload-2 (acks=1, 34TB data per broker) | Max avg P99 produce latency | 12ms | 10ms |
Max avg P95 produce latency | 6ms | 5ms | |
Duration | 4min | 520min |
With tiered storage, the rebuilding broker only needs to fetch the latest data that has not been shipped to remote storage. Without tiered storage, the rebuilt broker has to fetch all the data that has not expired from the other brokers. With the same log retention time, tiered storage reduced the rebuilding time by more than 100 times.
Without tiered storage, the rebuilding broker has to read a large amount of data from the local hard disks of the leaders. This competes page cache and local disk bandwidth with the normal traffic, and dramatically increases the acks=all produce latency.
Alternatives considered
Following alternatives were considered:
- Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the local disk for efficient latest reads as done in Kafka today.
- Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing the entire Kafka cluster with another system.
- Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint and hence is not considered.
Remote Log Indexes
For each topic partition that has RLM configured, RLM leader for a topic partition copies log segments which have last message offset less than last stable offset of that topic partition to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.
After a segment file is copied to remote storage, RLM will append a set of index entries to 3 local index files: remoteLogIndex, remoteOffsetIndex, remoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).
(active segment)
{log.dirs}/{topic-partition}/0000002400013.index
{log.dirs}/{topic-partition}/0000002400013.timeindex
{log.dirs}/{topic-partition}/0000002400013.log
(inactive segments)
{log.dirs}/{topic-partition}/0000002000238.index
{log.dirs}/{topic-partition}/0000002000238.timeindex
{log.dirs}/{topic-partition}/0000002000238.log
{log.dirs}/{topic-partition}/0000001600100.index
{log.dirs}/{topic-partition}/0000001600100.timeindex
{log.dirs}/{topic-partition}/0000001600100.log
( remote segment)
{log.dirs}/{topic-partition}/0000001000121.remoteOffsetIndex
{log.dirs}/{topic-partition}/0000001000121.remoteTimeIndex
{log.dirs}/{topic-partition}/0000001000121.remoteLogIndex
( remote segments)
{log.dirs}/{topic-partition}/0000000512002.remoteOffsetIndex
{log.dirs}/{topic-partition}/0000000512002.remoteTimeIndex
{log.dirs}/{topic-partition}/0000000512002.remoteLogIndex
Each index entry of the remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remoteLogIndex entry:
magic: int16 (current magic value is 0)
length: int16 (length of this entry)
crc: int32 (checksum from firstOffset to the end of this entry)
firstOffset: int64 (the Kafka offset of the 1st record)
lastOffset: int64 (the Kafka offset of the last record)
firstTimestamp: int64
lastTimestamp: int64
dataLength: int32 (length of the remote data)
rdiLength: int16
rdi: byte[] (Remote data identifier)
RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.
Depends on the implementation, RLM may append 1 or more entries to the remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.
The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.
Remoteoffsetindex file and remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remoteLogIndex file instead of a log segment file.