...
Code Block |
---|
{ "apiKey": 57, "type": "request", "listeners": ["zkBroker", "broker", "controller"], <-- New listener "controller" for KRaft "name": "UpdateFeaturesRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "How long to wait in milliseconds before timing out the request." }, { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+", "about": "The list of updates to finalized features.", "fields": [ {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the finalized feature to be updated."}, {"name": "MaxVersionLevel", "type": "int16", "versions": "0+", "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."}, ------------------ Remove Field ------------- {"name": "AllowDowngrade", "type": "bool", "versions": "0+", "about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."} ]}, ------------------ Begin New FieldsField ---------------- {"name": "DryRunDowngradeType", "type": "boolint8", "versions": "10+", "default": false}, {"nameabout": "ForceDowngrade",The "type": "bool", "versions": "1+", "default": false} ------------------ End New Fields - of downgrade to perform. Three types are supported: 0 is NONE (no downgrade will be performed), 1 is SAFE, and 2 is UNSAFE. The safety of a downgrade is determined by the controller and is specific to each feature flag."} ------------- ] } |
One new code for UpdateFeaturesResponse results ErrorCode
- UNSAFE_FEATURE_DOWNGRADE: indicates that a requested feature level downgrade cannot safely be performed
Add --force
flag to the kafka-features.sh tool
Code Block |
---|
This tool describes and updates finalized features. Option Description ------ ----- End New Field ------------ ]}, ------------------ New Field ------------------ {"name": "DryRun", "type": "bool", "versions": "1+", "default": false}, ------------------ End New Field -------------- ] } |
One new code for UpdateFeaturesResponse results ErrorCode
- UNSAFE_FEATURE_DOWNGRADE: indicates that a requested feature level downgrade cannot safely be performed
Add --force
flag to the kafka-features.sh tool
Code Block |
---|
This tool describes and updates finalized features. Option --bootstrap-server <String: server to REQUIRED: A comma-separated list of connect to> Description host:port pairs to use for ------ ----------- establishing the connection to the --bootstrap-server <String: server to REQUIRED: A comma-separated list of connect to> Kafka cluster. host:port pairs to use for --command-config [String: command Property file containing configs to be config property file] passedestablishing tothe Adminconnection Client.to Thisthe is used with --bootstrap-server option when Kafka cluster. --command-config [String: command Property file containing configs to be config property file] required. passed to Admin Client. This is used --describe Describe supported and finalized with --bootstrap-server option when featuresrequired. from a random broker. --downgrade-all --describe Downgrades all finalized features to Describe supported and finalized the maximum version levels known to features from a random broker. --downgrade-all the tool. This command deletes Downgrades all finalized features to unknown features from the list of the maximum version levels known to finalized features in the cluster, the tool. This command deletes but it is guaranteed to not add a unknown features from the list of new feature. finalized features in the cluster, --dry-run but Performsit a dry-run ofis guaranteed to not add a new feature. upgrade/downgrade mutations to --dry-run Performs a dry-run of finalized feature without applying them.upgrade/downgrade mutations to ------------------ Begin New Flags ------------ --force Perform an operation evenfinalized iffeature thewithout controller applying determines that isthem. unsafe ------------------ EndBegin New Flags -------------- --help --force Perform an operation even if the controller Print usage information. --upgrade-all determines that is unsafe Upgrades all finalized features to the ------------------ End New Flags -------------- --help Print usage information. maximum version levels known to the --upgrade-all Upgrades all finalized features to the tool. This command finalizes new maximum version levels known to the features known to the tool that were tool. This command finalizes new never finalized previously in the features known to the tool that were cluster, but it is guaranteed to not never finalized previously in the delete any existing feature. --version cluster, but it is guaranteed to not delete Displayany Kafkaexisting version. feature. --version Display Kafka version. |
Note that the currently Note that the currently implementation of kafka-features.sh lacks the --upgrade
and --downgrade
arguments that are defined in KIP-584. Both of these will be needed for this KIP.
Proposed Changes
Overview
...
Add --metadata-version
option to "format" sub-command of kafka-storage.sh
Code Block |
---|
usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID [--metadata-version VERSION] [--ignore-formatted]
optional arguments:
-h, --help show this help message and exit
--config CONFIG, -c CONFIG
The Kafka configuration file to use.
--cluster-id CLUSTER_ID, -t CLUSTER_ID
The cluster ID to use.
--metadata-version VERSION
The initial value for metadata.version feature flag.
--ignore-formatted, -g
|
Proposed Changes
Overview
The sections below go into more detail, but the overall workflow of an upgrade is:
- Operator performs rolling restart of cluster with a new software version
- Operator increases metadata.version feature flag using
kafka-features.sh
tool- UpdateFeaturesRequest is sent to the active controller
- The controller validates that the cluster can be upgraded to this version
- FeatureLevelRecord is written to the metadata log
- Metadata snapshot is generated and sent to the other nodes
- Components reload their state with new version
The downgrade workflow is similar:
- Operator decreases metadata.version feature flag using
kafka-features.sh
tool- UpdateFeaturesRequest is sent to the active controller
- The controller validates that the cluster can be safely downgraded to this version (override with --force)
- FeatureLevelRecord is written to the metadata log
- Metadata snapshot is generated and sent to the other inactive controllers and to brokers (this snapshot may be lossy!)
- Components reload their state to recognize the new (old) version
- Operator performs rolling restart of cluster with a new software version
- Operator increases metadata.version feature flag using kafka-features.sh tool
- UpdateFeaturesRequest is sent to the active controller
- The controller validates that the cluster can be upgraded to this version
- FeatureLevelRecord is written to the metadata log
- Metadata snapshot is generated and sent to the other nodes
- Components reload their state with new version
The downgrade workflow is similar:
- Operator decreases metadata.version feature flag using kafka-features.sh tool
- UpdateFeaturesRequest is sent to the active controller
- The controller validates that the cluster can be safely downgraded to this version (override with --force)
- FeatureLevelRecord is written to the metadata log
- Metadata snapshot is generated and sent to the other inactive controllers and to brokers (this snapshot may be lossy!)
- Components reload their state to recognize the new (old) version
- Operator performs rolling restart of cluster with downgraded software version
New Feature Flag
We will introduce a new feature flag named metadata.version which takes over and expands on the role of inter.broker.protocol.version. This new feature flag will track changes to the metadata record format and RPCs. Whenever a new record or RPC is introduced, or an incompatible change is made to an existing record or RPC, we will increase this version. The metadata.version is free to increase many times between Kafka releases. This is similar to the IV (inter-version) versions of the IBP.
The metadata.version feature flag will be defined and configured using the facilities introduced by KIP-584 (feature versions). As brokers and controllers upgrade to new software, their maximum supported metadata.version will increase automatically. However, the “finalized” version that can be used by the cluster will only be increased by an operator once all the nodes have upgraded. In other words, the basic workflow of an upgrade is:
- Rolling upgrade software of each node (broker and controller)
- Online upgrade of metadata.version to the desired supported version
In the absence of an operator defined value for metadata.version, we cannot safely assume anything about which metadata.version to use. If we simply assumed the highest supported value, it could lead to unintended downgrades in the event that a broker with a lower supported version joined the cluster. To avoid this, and other upgrade complications, we will need to bootstrap metadata.version with some initial version.
- downgraded software version
New Feature Flag
We will introduce a new feature flag named metadata.version which takes over and expands on the role of inter.broker.protocol.version. This new feature flag will track changes to the metadata record format and RPCs. Whenever a new record or RPC is introduced, or an incompatible change is made to an existing record or RPC, we will increase this version. The metadata.version is free to increase many times between Kafka releases. This is similar to the IV (inter-version) versions of the IBP.
The metadata.version feature flag will be defined and configured using the facilities introduced by KIP-584 (feature versions). As brokers and controllers upgrade to new software, their maximum supported metadata.version will increase automatically. However, the “finalized” version that can be used by the cluster will only be increased by an operator once all the nodes have upgraded. In other words, the basic workflow of an upgrade is:
- Rolling upgrade software of each node (broker and controller)
- Online upgrade of metadata.version to the desired supported version
In the absence of an operator defined value for metadata.version, we cannot safely assume anything about which metadata.version to use. If we simply assumed the highest supported value, it could lead to unintended downgrades in the event that a broker with a lower supported version joined the cluster. To avoid this, and other upgrade complications, we will need to bootstrap metadata.version with some initial version.
Initialization
When the quorum leader is starting up for the first time after this feature flag has been introduced, it will need a way to initialize the finalized version. After the leader finishes loading its state from disk, if has not encountered a FeatureLevelRecord, it will read an initial value for this feature from its local meta.properties file and generate a FeatureLevelRecord. We will extend the format
sub-command of kafka-storage.sh
to allow operators to specify which version is initialized. If no value has been specified by the operator, the tool will select the latest known value for that version of the software.
Compatibility
It is possible that brokers and controllers attempt to join the cluster or quorum, but cannot support the current metadata.version. For brokers, this is already handled by the controller during registration. If a broker attempts to register with the controller, but the controller determines that the broker cannot support the current set of finalized features (which includes metadata.version), it will reject the registration request. For controllers, it is more complicated since we need to allow the quorum to be established in order to allow records to be exchanged and learn about the new metadata.version. A controller running old software will join the quorum and begin replicating the metadata log. If this inactive controller encounters a FeatureLevelRecord for metadata.version that it cannot support, it should terminate.
In the unlikely even that an active controller encounters an unsupported metadata.version, it should resign and terminate.
If a broker encounters an unsupported metadata.version, it should unregister itself and terminateFor the first release that supports metadata.version, we can simply initialize metadata.version with the current (and only) version. For future releases, we will need a mechanism to bootstrap a particular version. This could be done using the meta.properties file or some similar mechanism. The reason we need the allow for a specific initial version is to support the use case of starting a Kafka cluster at version X with an older metadata.version.
Upgrades
KRaft upgrades are done in two steps with only a single rolling restart of the cluster required. After all the nodes of the cluster are running the new software version, they will continue using the previous version of RPCs and record formats. Only after increasing the metadata.version will these new RPCs and records be used. Since a software upgrade may span across multiple metadata.version versions, it should be possible to perform many online upgrades without restarting any nodes. This provides a mechanism for incrementally increasing metadata.version to try out new features introduced between the initial software version and the upgraded software version.
...