Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 21

...

Collaborators: Karthik Rajagopalan, Nagarjuna Koduru

Status

Current state:"Draft"Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-7739

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Users modify the topic configuration:
    1. Users set the configuration "remote.storage.enable=false" for the desired topic, indicating the disablement of tiered storage.
    2. Optionally, users specify a disablement policy to either retain or delete the remote data associated with the topic: remote.log.disable.policy=retain|delete.
  2. Controller persists configuration change:
    1. The controller creates a ConfigRecord and persists it in the metadata topic.
    2. The controller also updates the Topic metadata to increment the tiered_epoch and update the tiered_stateto DISABLING state.
  3. Broker detects the configuration change:
    1. Broker replicate the metadata topic and fetch the latest records of ConfigRecord changes.
    2. The broker calls the ConfigHandler to process the ConfigRecord change with tiered storage disablement.
    3. The broker ConfigHandler calls RemoteLogManager#stopPartitions().
  4. Execution of RemoteLogManager#stopPartitions():
    1. Removes the scheduled tasks for the topic-partition from the thread pool responsible for archiving logs to remote storage.
    2. If the disablement policy is set to delete, the Log start offset (LSO) is updated to match the Local Log Start Offset and the remote log is deleted by calling the RemoteStorageManager#deleteLogSegmentData().
    3. If the disablement policy is set to retain, there is no extra call required to delete remote data as the RemoteLogManager leader partition task will periodically expire segments that are earlier than the log start offset.
  5. Controller interaction:
    1. The controller enqueues the disablement intent for tiered storage to a local queue.
    2. It sends an API call, called DisableRemoteTopic, to each broker with a callback to disable the remote topic.
    3. DisableRemoteTopic is a new controller-to-broker API that will be introduced in this KIP.
    4. Note that the current approach for communication between the Controller and brokers involves a familiar pattern. Tasks are enqueued in a local queue within the Controller Context, and a request is sent to the brokers along with a callback mechanism. This method is currently employed by the controller for operations, such as deleting topics. For instance, when the controller initiates the deletion of topics, it maintains a local queue called topicsToBeDeleted. This queue serves the purpose of monitoring the progress of topic deletions.
  6. Completion of disablement:
    1. Once the controller receives responses from all brokers for the DisableRemoteTopic request, it updates the tiered_state in the TopicMetadata to "DISABLED".
    2. This update marks the completion of the disablement process, indicating that tiered storage has been successfully disabled for the Kraft-based clusters.

Failure Modes

  • Controller Failover in DISABLING State: Following a new controller election, the controller context will be reconstructed for all topic-partitions that are in the DISABLING state. Additionally, the controller will initiate a new request for DisableRemoteTopic to all brokers.
  • Broker Dies in DISABLING State or Fails to Complete the DisableRemoteTopic Request: The controller maintains an internal queue to track completed DisableRemoteTopic calls. In the event of a leader failover during this state, the controller will retry the operation until it receives a successful response. This behavior mirrors how we currently handle topic deletion.

Internal Interface Changes


1. DisableRemoteTopic is a new controller-to-broker API with the following schema. Note that this will bump the
inter.broker.protocol.version.

java
Code Block
language
titleDisableRemoteTopic
{
  "apiKey": TBD:(INT16),
  "type": "metadata",
  "name": "DisableRemoteTopic",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." }
  ]
}

...

2. The RemoteLogSegmentMetadata will have a new field, tieredEpoch.

Code Block
title
languagejavaRemoteLogSegmentMetadata
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, 
                                long startOffset,
                                long endOffset,
                                long maxTimestamp,
                                int leaderEpoch,
                                long eventTimestamp,
                                int segmentSizeInBytes,
                                RemoteLogSegmentState state,
                                Map<Int, Long> segmentLeaderEpochs,
                                int tieredEpoch) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.eventTimestamp = eventTimestamp;
        this.segmentLeaderEpochs = segmentLeaderEpochs;
        this.state = state;
        this.segmentSizeInBytes = segmentSizeInBytes;
+       this.tieredEpoch = tieredEpoch;
    }

...

3. In ZooKeeper-based clusters, the topics' ZNode will have two new leaf ZNodes for storing the tiered epoch and the tiering state.

Code Block
languagejava
titleZK
/brokers/topics/{topic-name}/partitions
                            /tieredstorage/
                                          /tiered_epoch
                                          /state

...

Code Block
MetadataResponse#TopicMetadata. Add new field tiered_epoch.

TopicMetadata => topic_error_code topic is_internal partition_metadata
  topic_error_code => int16
  topic => str
  is_internal => boolean
  partition_metadata => [PartitionMetadata]
+ tiered_epoch => int32
+ tiered_state => [TieredState]


Public Interfaces

Client API Changes

The AlterConfigs API will be updated to accommodate the following changes:

  • Updating the topic-level configuration remote.storage.enable with a new configuration value of false will disable tiered storage.
  • The AlterConfigs API will also be updated to support an optional tiered storage disablement policy, remote.log.disable.policy=retain|delete. If the disablement policy is not specified, it will default to Retainthe remote logs.
  • Re-enablement after disablement will not introduce any changes to the public interface.
  • Re-enablement will not provide an option to specify a policy.

Exceptions

There are certain exceptional scenarios to consider:

  • If a new disablement request is encountered while the topic is still in the DISABLING state, an exception named TIERED_STORAGET_DISABLEMENT_IN_PROGRESS will be thrown. This exception will have the following details:
    • Error: TIERED_STORAGET_DISABLEMENT_IN_PROGRESS
    • Code: > 71 (To be determined during implementation time)
  • For other invalid formatted requests, such as an unsupported disablement policy option, the existing error INVALID_REQUEST (42) will be returned with an appropriate error message.

Configuration

A new configuration to support the disablement policy will be introduced.


Code Block
Configuration: remote.log.disable.policy
Description: Determines whether tiered data for a topic should be retained or deleted after tiered storage disablement on a topic.
Type: String
Default: retain
Valid values: retain, delete
Scope: topic wide



The configuration setting remote.log.disable.policy can also be introduced as a broker-level configuration, with the default value set to retain. For the initial implementation, we are suggesting this to be a topic level configuration, but we have the flexibility to make it to a broker-level configuration in future iterations.

Code Block
# Defaults to remote.log.disable.policy=retain
bin/kafka-configs.sh --bootstrap-server {bootstrap-string} \
   --alter --entity-type topics --entity-name {topic-name} \
   --add-config 'remote.storage.enable=false'
   
#Disable with remote.log.disable.policy=retain
bin/kafka-configs.sh --bootstrap-server {bootstrap-string} \
   --alter --entity-type topics --entity-name {topic-name} \
   --add-config 'remote.log.disable.policy=retain, remote.storage.enable=false'

#Disable with remote.log.disable.policy=delete
bin/kafka-configs.sh --bootstrap-server {bootstrap-string} \
   --alter --entity-type topics --entity-name {topic-name} \
   --add-config 'remote.log.disable.policy=delete, remote.storage.enable=false'

Metrics

Further details to follow as the design progresses.

Compatibility, Deprecation, and Migration Plan

  • This feature will be compatible with all Kafka versions that support Tiered Storage.
  • This feature will be compatible with both Kraft-based and ZooKeeper-based clusters.

Test Plan

The test plan for this feature includes the following:

  • Integration Tests:For integration tests, we will utilize a file-based tiering (LocalTieredStorage) to test the disablement code path.
  • Unit Tests: Comprehensive unit test coverage will be provided to ensure the functionality of individual components involved in the disablement process.
  • System Tests: System tests will be conducted for both ZooKeeper-based and Kraft-based clusters.

Rejected Alternatives

The following alternatives were considered but ultimately rejected:

  1. Introducing a new API from the broker to the controller mode to notify the controller when the disablement process is completed. This alternative was not pursued as it represents a shift in the paradigm, where a broker initiates a call to the controller. It was deemed less suitable for the desired design approach.
  2. Reuse the existing API StopReplicaRequest instead of creating a new API DisableRemoteTopic to notify brokers the disablement of tiered storage on a topic. This approach would function from a technical standpoint by simply adding an additional flag to indicate that the StopReplicaRequest is for the disablement of tiered storage. However, for the sake of clarity in intent, it is recommended to opt for the creation of the new API, DisableRemoteTopic. This choice ensures clear and unambiguous communication of the operation's purpose.
  3. Introducing a temporary ZNode in /{$AdminZNode.path}/remote_storage_topic_disablements/{$topic} in ZooKeeper (ZK) mode to serve as a lock while the disablement process is in progress. This alternative was rejected because maintaining the state within the Topics ZNode is more intuitive and consolidates topic-related state in a single location.