Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 21

...

This proposal aims to address a gap in the existing functionality introduced by KIP - 405, which enabled customers to enable Tiered Storage but did not provide the capability to disable it. To enhance the flexibility and control of data storage in Kafka clusters, we propose the introduction of a Tiered Storage disablement feature. This feature will allow customers to selectively disable Tiered Storage on a per-topic basis. There are several reasons why customers would be interested in this feature:

...

  1. Disable Tiering on a Topic: Customers should have the ability to disable tiering on a topic that has Tiered Storage enabled.
  2. Retention of Tiered Data: When disabling tiered storage on a topic, customers should have the option to retain the tiered data according to the configured retention policy prior to disabling tiering.
  3. Deletion of Tiered Data: When disabling tiered storage on a topic, customers should have the option to delete the tiered data in the remote storage associated with that topic.
  4. Fetching Previously Tiered Data: Customers should have the option to continue fetching data that was previously tiered before the tiered storage disablement. This allows them to access and consume the tiered data even after the tiering feature has been disabled.
  5. Re-enabling Tiering on a Previously Disabled Topic: Customers should be able to re-enable tiering on a topic that was previously disabled, allowing them to resume tiered storage for that specific topic.
  6. Support for Both Kraft and ZooKeeper-based Based Clusters: The tiered storage disablement feature should be designed and implemented to support both Kraft and ZooKeeper-based Kafka clusters, ensuring compatibility across different cluster configurations. We have chosen to not put forward this feature in Zookeeper-based cluster, because Zookeeper is deprecated and as of Apache Kafka 3.6 is not considered for new functionality.

Non-Goals

The following aspects are considered as non-goals for the tiered storage disablement feature:

...

Disablement - Zookeeper Based Cluster

Zookeeper-based clusters will not support topic disablement. Since we are targeting Apache Kafka 3.8 for this KIP (which still supports Zookeeper) setting any of the new configurations won't take any effect.

Disablement - Kraft Based Cluster

Image Removed

To disable tiered storage on a topic in Kraft-based clusters, the following steps are involved:


Image Added

To disable tiered storage on a topic in ZooKeeper-based clusters, the following steps are involved:

  1. Users modify the topic configuration:
    1. Users set the configuration remote.storage.enable=false
  2. Users modify the topic configuration:
    1. Users set the configuration "remote.storage.enable=false" for the desired topic, indicating the disablement of tiered storage.
    2. Optionally, users specify a disablement policy to either retain or delete the remote data associated with the topic: .remote.log.disable.policy=retain | delete. The disablement policy defaults to Retain if the option is not specified.
  3. Controller persists triggers configuration change in ZooKeeper:
    1. Disabling tiered storage on a topic prompts the controller to send the configuration change to ZooKeeper.
    2. The topic's configuration in the config ZNode is updated to remote.storage.enable=false .
    3. The tiered_epoch is incremented, and the tiered_state is set to DISABLING under the topics ZNode
    4. The controller creates a ConfigRecord and persists it in the metadata topic.
    5. The controller also updates the Topic metadata to increment the tiered_epoch and update the tiered_stateto DISABLING state.
  4. Broker detects the configuration change:
    1. Each broker in the cluster listens to changes in ZooKeeper.
    2. Upon detecting that the tiered storage configuration for a topic is disabled, the broker invokes the RemoteLogManager#stopPartitions() method
    3. Broker replicate the metadata topic and fetch the latest records of ConfigRecord changes.
    4. The broker calls the ConfigHandler to process the ConfigRecord change with tiered storage disablement.
    5. The broker ConfigHandler calls RemoteLogManager#stopPartitions().
  5. Execution of RemoteLogManager#stopPartitions():
    1. Removes the scheduled tasks for the topic-partition from the thread pool responsible for archiving logs to remote storage.
    2. If the disablement policy is set to delete, the Log start offset (LSO) is updated to match the Local Log Start Offset and the remote log is deleted by calling the RemoteStorageManager#deleteLogSegmentData().
    3. If the disablement policy is set to retain, there is no extra call required to delete remote data as the RemoteLogManager leader partition task will periodically expire segments that are earlier than the log start offset.
  6. Controller interaction:
    1. The controller enqueues the disablement intent for tiered storage to a local queue.
    2. It sends an API call, called DisableRemoteTopic, to each broker with a callback to disable the remote topic.
    3. DisableRemoteTopic is a new controller - to - broker API that will be introduced in this KIP.
    4. Note that the current approach for communication between the Controller and brokers involves a familiar pattern. Tasks are enqueued in a local queue within the Controller Context, and a request is sent to the brokers along with a callback mechanism. This method is currently employed by the controller for operations, such as deleting topics. For instance, when the controller initiates the deletion of topics, it maintains a local queue called topicsToBeDeleted. This queue serves the purpose of monitoring the progress of topic deletions.
  7. Completion of disablement:
    1. Once the controller receives responses from all brokers for the DisableRemoteTopic request, it updates the tiered_state in the TopicMetadata to "DISABLED"topics ZNode.
    2. This update marks the completion of the disablement process, indicating that tiered storage has been successfully disabled for the Krafttopic in ZooKeeper-based clusters.

Failure Modes

  • Controller Failover in DISABLING State: Following a new controller election, the controller context will be reconstructed for all topic-partitions that are in the DISABLING state. Additionally, the controller will initiate a new request for DisableRemoteTopic to all brokers.
  • Broker Dies in DISABLING State or Fails to Complete the DisableRemoteTopic Request: The controller maintains an internal queue to track completed DisableRemoteTopic calls. In the event of a leader failover during this state, the controller will retry the operation until it receives a successful response. This behavior mirrors how we currently handle topic deletion.

Internal Interface Changes

1. DisableRemoteTopic is a new controller-to-broker API with the following schema. Note that this will bump the
inter.broker.protocol.version.

Code Block
titleDisableRemoteTopic
{
  "apiKey": TBD:(INT16),
  "type": "metadata",
  "name": "DisableRemoteTopic",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." }
  ]
}

2. The RemoteLogSegmentMetadata will have a new field, tieredEpoch.

Disablement - Kraft Based Cluster

Image Added


To disable tiered storage on a topic in Kraft-based clusters, the following steps are involved:

  1. Users modify the topic configuration:
    1. Users set the configuration "remote.storage.enable=false" for the desired topic, indicating the disablement of tiered storage.
    2. Optionally, users specify a disablement policy to either retain or delete the remote data associated with the topic: remote.log.disable.policy=retain|delete.
  2. Controller persists configuration change:
    1. The controller creates a ConfigRecord and persists it in the metadata topic.
    2. The controller also updates the Topic metadata to increment the tiered_epoch and update the tiered_stateto DISABLING state.
  3. Broker detects the configuration change:
    1. Broker replicate the metadata topic and fetch the latest records of ConfigRecord changes.
    2. The broker calls the ConfigHandler to process the ConfigRecord change with tiered storage disablement.
    3. The broker ConfigHandler calls RemoteLogManager#stopPartitions().
  4. Execution of RemoteLogManager#stopPartitions():
    1. Removes the scheduled tasks for the topic-partition from the thread pool responsible for archiving logs to remote storage.
    2. If the disablement policy is set to delete, the Log start offset (LSO) is updated to match the Local Log Start Offset and the remote log is deleted by calling the RemoteStorageManager#deleteLogSegmentData().
    3. If the disablement policy is set to retain, there is no extra call required to delete remote data as the RemoteLogManager leader partition task will periodically expire segments that are earlier than the log start offset.
  5. Controller interaction:
    1. The controller enqueues the disablement intent for tiered storage to a local queue.
    2. It sends an API call, called DisableRemoteTopic, to each broker with a callback to disable the remote topic.
    3. DisableRemoteTopic is a new controller-to-broker API that will be introduced in this KIP.
    4. Note that the current approach for communication between the Controller and brokers involves a familiar pattern. Tasks are enqueued in a local queue within the Controller Context, and a request is sent to the brokers along with a callback mechanism. This method is currently employed by the controller for operations, such as deleting topics. For instance, when the controller initiates the deletion of topics, it maintains a local queue called topicsToBeDeleted. This queue serves the purpose of monitoring the progress of topic deletions.
  6. Completion of disablement:
    1. Once the controller receives responses from all brokers for the DisableRemoteTopic request, it updates the tiered_state in the TopicMetadata to "DISABLED".
    2. This update marks the completion of the disablement process, indicating that tiered storage has been successfully disabled for the Kraft-based clusters.

Failure Modes

  • Controller Failover in DISABLING State: Following a new controller election, the controller context will be reconstructed for all topic-partitions that are in the DISABLING state. Additionally, the controller will initiate a new request for DisableRemoteTopic to all brokers.
  • Broker Dies in DISABLING State or Fails to Complete the DisableRemoteTopic Request: The controller maintains an internal queue to track completed DisableRemoteTopic calls. In the event of a leader failover during this state, the controller will retry the operation until it receives a successful response. This behavior mirrors how we currently handle topic deletion.

Internal Interface Changes


1. DisableRemoteTopic is a new controller-to-broker API with the following schema. Note that this will bump the
inter.broker.protocol.version.

Code Block
titleDisableRemoteTopic
{
  "apiKey": TBD:(INT16),
  "type": "metadata",
  "name": "DisableRemoteTopic",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." }
  ]
}


2. The RemoteLogSegmentMetadata will have a new field, tieredEpoch.

Code Block
languagejava
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, 
                                long startOffset,
                                long endOffset,
                                long maxTimestamp,
Code Block
languagejava
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, 
                                longint startOffsetleaderEpoch,
                                long endOffseteventTimestamp,
                                longint maxTimestampsegmentSizeInBytes,
                                intRemoteLogSegmentState leaderEpochstate,
                                long eventTimestamp,
Map<Int, Long> segmentLeaderEpochs,
                                int tieredEpoch) {
        this.remoteLogSegmentId int= segmentSizeInBytes,remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
     RemoteLogSegmentState state,
  this.maxTimestamp = maxTimestamp;
        this.eventTimestamp = eventTimestamp;
        this.segmentLeaderEpochs = segmentLeaderEpochs;
        Map<Int, Long> segmentLeaderEpochs,this.state = state;
        this.segmentSizeInBytes = segmentSizeInBytes;
+       this.tieredEpoch = tieredEpoch;
    }



3. In ZooKeeper-based clusters, the topics' ZNode will have two new leaf ZNodes for storing the tiered epoch and the tiering state.

Code Block
/brokers/topics/{topic-name}/partitions
         int tieredEpoch) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
/tieredstorage/
          this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.eventTimestamp = eventTimestamp;
  /tiered_epoch
       this.segmentLeaderEpochs = segmentLeaderEpochs;
        this.state = state;
        this.segmentSizeInBytes = segmentSizeInBytes;
+       this.tieredEpoch = tieredEpoch;
    }/state

34. In Kraft-based clusters, the TopicMetadata contains a new field. The addition of this field will bump the metadata version.

...

  • Updating the topic-level configuration remote.storage.enable with a new configuration value of false will disable tiered storage.
  • The AlterConfigs API will also be updated to support an optional tiered storage disablement policy, remote.log.disable.policy=retain|delete. If the disablement policy is not specified, it will default to Retainthe remote logs.
  • Re-enablement after disablement will not introduce any changes to the public interface.
  • Re-enablement will not provide an option to specify a policy.

...

  • If a new disablement request is encountered while the topic is still in the DISABLING state, an exception named TIERED_STORAGESTORAGET_DISABLEMENT_IN_PROGRESS will be thrown. This exception will have the following details:
    • Error: TIERED_STORAGESTORAGET_DISABLEMENT_IN_PROGRESS
    • Code: > 71 (To be determined during implementation time)
  • For other invalid formatted requests, such as an unsupported disablement policy option, the existing error INVALID_REQUEST (42) will be returned with an appropriate error message.

...

Compatibility, Deprecation, and Migration Plan

  • This feature will be compatible with all Kafka versions that support Tiered Storage.
  • This feature will be compatible with both Kraft-based clusters supporting Tiered Storage onlyand ZooKeeper-based clusters.

Test Plan

The test plan for this feature includes the following:

...

  1. Introducing a new API from the broker to the controller mode to notify the controller when the disablement process is completed. This alternative was not pursued as it represents a shift in the paradigm, where a broker initiates a call to the controller. It was deemed less suitable for the desired design approach.
  2. Reuse the existing API StopReplicaRequest instead of creating a new API DisableRemoteTopic to notify brokers the disablement of tiered storage on a topic. This approach would function from a technical standpoint by simply adding an additional flag to indicate that the StopReplicaRequest is for the disablement of tiered storage. However, for the sake of clarity in intent, it is recommended to opt for the creation of the new API, DisableRemoteTopic. This choice ensures clear and unambiguous communication of the operation's purpose.In an earlier iteration of this KIP we proposed this feature for both Zookeeper-based and Kraft-based clusters. As we are approaching Apache Kafka 4.0 which will remove Zookeeper, we decided that it is better to focus our efforts on only supporting this feature in KRaftunambiguous communication of the operation's purpose.
  3. Introducing a temporary ZNode in /{$AdminZNode.path}/remote_storage_topic_disablements/{$topic} in ZooKeeper (ZK) mode to serve as a lock while the disablement process is in progress. This alternative was rejected because maintaining the state within the Topics ZNode is more intuitive and consolidates topic-related state in a single location.