This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Authors: Mehari Beyene, Divij Vaidya
Collaborators: Karthik Rajagopalan, Nagarjuna Koduru
Current state:Under Discussion
Discussion thread: here
JIRA: KAFKA-7739
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This proposal aims to address a gap in the existing functionality introduced by KIP 405, which enabled customers to enable Tiered Storage but did not provide the capability to disable it. To enhance the flexibility and control of data storage in Kafka clusters, we propose the introduction of a Tiered Storage disablement feature. This feature will allow customers to selectively disable Tiered Storage on a per-topic basis. There are several reasons why customers would be interested in this feature:
The following aspects are considered as non-goals for the tiered storage disablement feature:
remote.log.storage.system.enable
.When disabling tiered storage on a topic, users will have the option of retaining or deleting the remote log at the time of disablement. This is represented by a remote log disablement policy. It will be an optional policy that users can specify. By default, we will retain the remote log.
The tiered epoch signifies the version of a tiered storage topic. This epoch establishes a boundary for the transition from the enabling to disabling state, preventing concurrent and out-of-order modifications. Its aim is to guarantee that all activities within the disablement process are fully completed before permitting the re-enablement of tiering.
When users disable tiered storage on a topic, the tiered storage state on a topic transitions through the following stages: ENABLED → DISABLING → DISABLED.
ENABLED
The ENABLED
state represents when tiered storage is enabled on a topic or when it has been re-enabled after disablement. In this state, the RemoteLogManager (RLM) has scheduled a task for the topic-partitions with the RemoteLogManager thread pool and RemoteStorageFetcher thread pool. The possible state transition from the ENABLED
state is to the DISABLING
state.
DISABLING
When users initiate an alter config API call to disable tiered storage on a topic that is currently in the ENABLED
state, the topic enters the DISABLING
state. In this state, various tasks are performed asynchronously to complete the disablement process. These tasks include:
remote.log.disable.policy=delete
option, draining fetch requests in RemoteFetchPurgatory and block new fetch requests from getting added to the RemoteStorageFetcher thread pool.remote.log.disable.policy=retain
option, we will cancel the tasks for the topic-partitions from the RemoteLogManager thread pool only for stopping archiving logs but the task for expiring remote segments will continue to work. The fetch requests from remote storage will also continue to work after disablementWhile the topic is in the DISABLING
state, new alter config API calls to disable tiered storage will be rejected. The possible state transition from the DISABLING
state is to the DISABLED
state.
DISABLED
The DISABLED
state represents when tiered storage is fully disabled on a topic. In this state, the RLM does not have any scheduled tasks for the topic-partitions in the RemoteLogManager thread pool for copying segments to remote storage. If the topic was disabled with the remote.log.disable.policy=delete
option, all remote data is marked for deletion and it is inaccessible for read. If the topic was disabled with the remote.log.disable.policy=retain
option, there will be tasks scheduled for the topic-partitions only for expiration of remote logs but not for copying logs to remote storage. The RemoteStorageFetcher thread pool will also continue to accept remote fetch requests. The possible state transition from DISABLED
state is to the ENABLED
.
When tiered storage is enabled in a topic, there are two sets of retention configurations available: local retention configuration (local.retention.ms
and/or local.retention.bytes
) and topic-wide retention configuration(retention.ms
and/or retention.bytes
). For the topic-wide retention configuration, if the values are not explicitly set, Kafka falls back to using the log.retention.ms
and/or log.retention.bytes
values.
When tiered storage is disabled on a topic, the local retention configuration becomes irrelevant, and all data expiration follows the topic-wide retention configuration exclusively.
If tiered storage is disabled using the remote.log.disable.policy=delete
policy, the topic will not have any archived segments in remote storage. In this case, the topic-wide retention configuration operates similarly to any non-tiered storage topic.
On the other hand, if tiered storage is disabled with the remote.log.disable.policy=retain
policy, it is possible that the topic still has segments in remote storage. In such scenarios, the topic-wide retention configuration will come into effect starting from the Remote log start offset (Rx).
As depicted in the diagram above, when tiered storage is disabled for a topic and there are still segments in the remote storage, the data expiration takes place from right (Rx) to left (LZ) based on the topic-wide retention settings of retention.ms
and/or retention.bytes
. This expiration process is relevant only when tiered storage is disabled with the remote.log.disable.policy=retain
, and there are remaining segments in the remote storage. If tiered storage is disabled with remote.log.disable.policy=delete
or if there is no data in the remote storage, the expiration behaves the same as for non-tiered storage topics.
This KIP supports the ability to re-enable tiered storage after it has been disabled. When tiered storage is enabled and disabled multiple times for a topic, the log offset remains continuous without any gaps. This is achieved by ensuring that data expiration always starts from the earliest offset, regardless of the storage tier being used.
To ensure offset continuity, when tiered storage is disabled with a remote.log.disable.policy=retain
policy, logs are expired starting from the remote log start offset, and only after all segments from remote storage have expired, local logs are expired. By following this approach, the continuity of offsets is preserved during the re-enablement.
Considering different scenarios, we can examine the continuity of log offsets:
Scenario 1: Re-enabling after tiered storage is disabled with remote.log.disable.policy=delete
. In the majority of cases, this is equivalent to enabling tiered storage for the first time as there will be no remote data from previous tiered storage enablement. However, it is possible that the asynchronous deletion of remote data may still be in progress when tiered storage is re-enabled from the previous disablement. To ensure there is no data mixup between the ongoing deletion and the newly tiered data after re-enablement, we take two measures. First, during disablement, we mark the segments for deletion and update the log start offset, and second, we fence the segments with the tiered epoch. Depending on the local retention configuration, data will be tiered to remote storage. Hence, even if the segments marked for deletion from the previous tiered storage enablement are still present in the remote storage awaiting deletion, they are completely isolated from the new data that is being tiered.
Scenario 2: Re-enabling after tiered storage is disabled with remote.log.disable.policy=retain
and there is no data in remote storage because all previously tiered data has expired. This scenario is similar to scenario #1, as all the data in remote storage has already expired. Following the local retention configuration, data will be tiered to remote storage.
Scenario 3: Re-enabling after tiered storage is disabled with remote.log.disable.policy=retain
and there is data in remote storage. When tiered storage was disabled, the remote log end offset (Ry) = local log start offset (Lx) + 1. After re-enabling tiered storage, the offset of the newly archived segments will continue from Lx, based on the local retention settings. This continuity is ensured by expiring data starting from the remote logs start offset (Rx), even while tiered storage is disabled. Local logs are only expired if there is no data in remote storage, guaranteeing offset continuity during the re-enabling process.
When the broker starts up, it checks the value of the configuration parameter remote.log.storage.system.enable
. If it is enabled, the broker initializes a RemoteLogManager (RLM). The RLM then initializes two pluggable components: RemoteStorageManager (RSM) and RemoteLogMetadataManager (RLMM). It also sets up two thread pools: RemoteLogManagerScheduledThreadPool
, responsible for copying log segments to remote storage and expiring remote segments, and RemoteStorageFetcherThreadPool
, responsible for handling fetch requests that require offsets in remote storage.
Each topic can be configured to use remote storage by setting the configuration remote.storage.enable
for that particular topic. This configuration can be set during topic creation or modified after the topic has been created.
If the cluster is using ZooKeeper (Zk) as the control plane, enabling remote storage for a topic triggers the controller to send this information to ZooKeeper. Each broker listens for changes in Zk, and when a change is detected, the broker triggers RemoteLogManager#onLeadershipChange().
In Kraft mode, when remote storage is enabled for a topic, the controller sends the information to the broker as a topic metadata change record. Each broker applies the metadata delta locally and calls RemoteLogManager#onLeadershipChange().
When RemoteLogManager#onLeadershipChange() is called, it schedules a leader or follower task in the RemoteLogManagerScheduledThreadPool for each topic partition that has tiered storage enabled.
The disablement code path differs between ZooKeeper-based clusters and Kraft-based clusters. However, the interface for communicating with the RemoteLogManager and how the RemoteLogManager handles disablements remain the same for both types of clusters. In this section, we will discuss the changes in the RemoteLogManager and the disablement code path for both ZooKeeper-based and Kraft-based clusters.
The RemoteLogManager has two separate thread pools: RemoteLogManagerScheduledThreadPool (for copying and expiring remote segments) and RemoteStorageFetcherThreadPool (for fetching remote data).
When tiered storage is disabled with remote.log.disable.policy=delete
, there is no remote data to expire or read from. Therefore, all scheduled tasks for the topic-partition from the RemoteLogManagerScheduledThreadPool can be canceled. However, when tiered storage is disabled with remote.log.disable.policy=retain
, even though segments are not copied to remote storage, tasks are still needed for expiring remote data and serving fetch requests for the remote data. Thus, even when tiered storage is disabled on a specific topic, there will still be tasks for expiring remote data and fetching data from the remote storage.
To seamlessly support this functionality, we propose splitting the RemoteLogManagerScheduledThreadPool in RemoteLogManager into two separate thread pools: RemoteStorageCopierThreadPool and RemoteDataExpirationThreadPool.
The responsibilities of these thread pools will be as follows:
During tiered storage enablement, when RemoteLogManager#onLeadershipChange() is called, tasks for the topic-partitions will be scheduled in these thread pools, similar to how we do it today in the RemoteStorageFetcherThreadPool.
During tiered storage disablement, when RemoteLogManager#stopPartition() is called:
delete
, tasks scheduled for the topic-partitions in the RemoteDataExpirationThreadPool will also be canceled.retain
, scheduled tasks for the topic-partitions in the RemoteDataExpirationThreadPool will remain unchanged.To disable tiered storage on a topic in ZooKeeper-based clusters, the following steps are involved:
remote.storage.enable=false
for the desired topic, indicating the disablement of tiered storage.remote.storage.enable=false
.elete
, the Log start offset (LSO) is updated to match the Local Log Start Offset and the remote log is deleted by calling the RemoteStorageManager#deleteLogSegmentData()
.retain
, there is no extra call required to delete remote data as the RemoteLogManager leader partition task will periodically expire segments that are earlier than the log start offset.topicsToBeDeleted
. This queue serves the purpose of monitoring the progress of topic deletions.To disable tiered storage on a topic in Kraft-based clusters, the following steps are involved:
remote.log.disable.policy=retain|delete
.tiered_epoch
and update the tiered_state
to DISABLING
state.RemoteLogManager#stopPartitions()
.RemoteLogManager#stopPartitions()
:elete
, the Log start offset (LSO) is updated to match the Local Log Start Offset and the remote log is deleted by calling the RemoteStorageManager#deleteLogSegmentData()
.retain
, there is no extra call required to delete remote data as the RemoteLogManager leader partition task will periodically expire segments that are earlier than the log start offset.DisableRemoteTopic
, to each broker with a callback to disable the remote topic.DisableRemoteTopic
is a new controller-to-broker API that will be introduced in this KIP.topicsToBeDeleted
. This queue serves the purpose of monitoring the progress of topic deletions.DisableRemoteTopic
request, it updates the tiered_state
in the TopicMetadata to "DISABLED".1. DisableRemoteTopic
is a new controller-to-broker API with the following schema. Note that this will bump the inter.broker.protocol.version
.
{ "apiKey": TBD:(INT16), "type": "metadata", "name": "DisableRemoteTopic", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1", "about": "The partition id." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique ID of this topic." } ] } |
2. The RemoteLogSegmentMetadata
will have a new field, tieredEpoch
.
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset, long maxTimestamp, int leaderEpoch, long eventTimestamp, int segmentSizeInBytes, RemoteLogSegmentState state, Map<Int, Long> segmentLeaderEpochs, int tieredEpoch) { this.remoteLogSegmentId = remoteLogSegmentId; this.startOffset = startOffset; this.endOffset = endOffset; this.leaderEpoch = leaderEpoch; this.maxTimestamp = maxTimestamp; this.eventTimestamp = eventTimestamp; this.segmentLeaderEpochs = segmentLeaderEpochs; this.state = state; this.segmentSizeInBytes = segmentSizeInBytes; + this.tieredEpoch = tieredEpoch; } |
3. In ZooKeeper-based clusters, the topics' ZNode will have two new leaf ZNodes for storing the tiered epoch and the tiering state.
/brokers/topics/{topic-name}/partitions /tieredstorage/ /tiered_epoch /state |
4. In Kraft-based clusters, the TopicMetadata
contains a new field. The addition of this field will bump the metadata version.
MetadataResponse#TopicMetadata. Add new field tiered_epoch. TopicMetadata => topic_error_code topic is_internal partition_metadata topic_error_code => int16 topic => str is_internal => boolean partition_metadata => [PartitionMetadata] + tiered_epoch => int32 + tiered_state => [TieredState] |
The AlterConfigs
API will be updated to accommodate the following changes:
remote.storage.enable
with a new configuration value of false
will disable tiered storage.AlterConfigs
API will also be updated to support an optional tiered storage disablement policy, remote.log.disable.policy=retain|delete
. If the disablement policy is not specified, it will default to Retain
the remote logs.There are certain exceptional scenarios to consider:
DISABLING
state, an exception named TIERED_STORAGET_DISABLEMENT_IN_PROGRESS
will be thrown. This exception will have the following details:INVALID_REQUEST (42)
will be returned with an appropriate error message.A new configuration to support the disablement policy will be introduced.
Configuration: remote.log.disable.policy Description: Determines whether tiered data for a topic should be retained or deleted after tiered storage disablement on a topic. Type: String Default: retain Valid values: retain, delete Scope: topic wide |
The configuration setting remote.log.disable.policy
can also be introduced as a broker-level configuration, with the default value set to retain
. For the initial implementation, we are suggesting this to be a topic level configuration, but we have the flexibility to make it to a broker-level configuration in future iterations.
# Defaults to remote.log.disable.policy=retain bin/kafka-configs.sh --bootstrap-server {bootstrap-string} \ --alter --entity-type topics --entity-name {topic-name} \ --add-config 'remote.storage.enable=false' #Disable with remote.log.disable.policy=retain bin/kafka-configs.sh --bootstrap-server {bootstrap-string} \ --alter --entity-type topics --entity-name {topic-name} \ --add-config 'remote.log.disable.policy=retain, remote.storage.enable=false' #Disable with remote.log.disable.policy=delete bin/kafka-configs.sh --bootstrap-server {bootstrap-string} \ --alter --entity-type topics --entity-name {topic-name} \ --add-config 'remote.log.disable.policy=delete, remote.storage.enable=false' |
Further details to follow as the design progresses.
The test plan for this feature includes the following:
The following alternatives were considered but ultimately rejected:
StopReplicaRequest
instead of creating a new API DisableRemoteTopic
to notify brokers the disablement of tiered storage on a topic. This approach would function from a technical standpoint by simply adding an additional flag to indicate that the StopReplicaRequest is for the disablement of tiered storage. However, for the sake of clarity in intent, it is recommended to opt for the creation of the new API, DisableRemoteTopic. This choice ensures clear and unambiguous communication of the operation's purpose./{$AdminZNode.path}/remote_storage_topic_disablements/{$topic}
in ZooKeeper (ZK) mode to serve as a lock while the disablement process is in progress. This alternative was rejected because maintaining the state within the Topics ZNode is more intuitive and consolidates topic-related state in a single location.