Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "apiKey": 57,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"], <-- New listener "controller" for KRaft
  "name": "UpdateFeaturesRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to finalized features.", "fields": [
      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the finalized feature to be updated."},
      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
        "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
      ------------------ Remove Field -------------
      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
        "about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}
     ]},
       ------------------ Begin New FieldsField ----------------
      {"name": "DryRunDowngradeType", "type": "boolint8", "versions": "10+",
 "default": false},
       {"nameabout": "ForceDowngrade",The "type": "bool", "versions": "1+", "default": false}
    ------------------ End New Fields - of downgrade to perform. Three types are supported: 0 is NONE (no downgrade will be performed), 1 is SAFE, and 2 is UNSAFE. The safety of a downgrade is determined by the controller and is specific to each feature flag."}
      -------------
  ]
}

One new code for UpdateFeaturesResponse results ErrorCode

  • UNSAFE_FEATURE_DOWNGRADE: indicates that a requested feature level downgrade cannot safely be performed  

Add --force  flag to the kafka-features.sh tool

Code Block
This tool describes and updates finalized features.
Option                                 Description                           
------                                 ----- End New Field ------------
    ]},
    ------------------ New Field ------------------
    {"name": "DryRun", "type": "bool", "versions": "1+", "default": false},
    ------------------ End New Field --------------
  ]
}


One new code for UpdateFeaturesResponse results ErrorCode

  • UNSAFE_FEATURE_DOWNGRADE: indicates that a requested feature level downgrade cannot safely be performed  



Add --force  flag to the kafka-features.sh tool


Code Block
This tool describes and updates finalized features.
Option                        
--bootstrap-server <String: server to  REQUIRED: A comma-separated list of   
  connect to>    Description                        host:port pairs to use for 
------           
                      -----------                   establishing the connection to the  
  
--bootstrap-server <String: server to  REQUIRED: A comma-separated list of   
  connect to>                        Kafka cluster.   host:port pairs to use for          
      
--command-config [String: command      Property file containing configs to be
  config property file]                  passedestablishing tothe Adminconnection Client.to Thisthe is used
                                         with --bootstrap-server option when 
Kafka cluster.                      
--command-config [String: command      Property file containing configs to be
  config property file]     required.             passed to Admin Client. This is used
        
--describe                             Describe supported and finalized with --bootstrap-server option  when 
                                         featuresrequired. from a random broker.      
--downgrade-all                 
--describe       Downgrades all finalized features to  
                Describe supported and finalized      
                the maximum version levels known to 
                   features from a random broker.      
--downgrade-all            the tool. This command  deletes      
 Downgrades all finalized features to  
                                  unknown features from the list of  the 
maximum version levels known to 
                                    finalized features in the cluster, the 
tool. This command deletes      
                                but it is guaranteed to not add a  unknown 
features from the list of   
                                  new feature.      finalized features in the cluster,  
            
--dry-run                             but Performsit a dry-run ofis guaranteed to not add a   
                 
                        new feature.                upgrade/downgrade mutations to      
--dry-run                              Performs a dry-run of            finalized feature without applying  
                                         them.upgrade/downgrade mutations to      
          
------------------ Begin New Flags ------------                      
--force								   Perform an operation evenfinalized iffeature thewithout controller
applying  
                                       determines that isthem. unsafe               
------------------ EndBegin New Flags --------------
--help        
--force								   Perform an operation even if the controller
                Print usage information.              
--upgrade-all         determines that is unsafe              Upgrades all finalized features to the
 
------------------ End New Flags --------------
--help                                 Print usage information.      maximum version levels known to the 
  
--upgrade-all                          Upgrades all finalized features to the
        tool. This command finalizes new    
                         maximum version levels known to the 
          features known to the tool that were
                         tool. This command finalizes new    
        never finalized previously in the   
                          features known to the tool that were
         cluster, but it is guaranteed to not
                          never finalized previously in the   
                             delete any existing feature.        
--version            cluster, but it is guaranteed to not
                                         delete Displayany Kafkaexisting version.  feature.        
--version                              Display Kafka version.  


Note that the currently Note that the currently implementation of kafka-features.sh lacks the --upgrade and --downgrade arguments that are defined in KIP-584. Both of these will be needed for this KIP.

Proposed Changes

Overview

...


Add --metadata-version option to "format" sub-command of kafka-storage.sh

Code Block
usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID [--metadata-version VERSION] [--ignore-formatted]

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.
  --cluster-id CLUSTER_ID, -t CLUSTER_ID
                         The cluster ID to use.
  --metadata-version VERSION
                         The initial value for metadata.version feature flag.
  --ignore-formatted, -g
 


Proposed Changes

Overview

The sections below go into more detail, but the overall workflow of an upgrade is:

  • Operator performs rolling restart of cluster with a new software version
  • Operator increases metadata.version feature flag using kafka-features.sh tool
    • UpdateFeaturesRequest is sent to the active controller
    • The controller validates that the cluster can be upgraded to this version
    • FeatureLevelRecord is written to the metadata log
    • Metadata snapshot is generated and sent to the other nodes
    • Components reload their state with new version

The downgrade workflow is similar:

  • Operator decreases metadata.version feature flag using kafka-features.sh tool
    • UpdateFeaturesRequest is sent to the active controller
    • The controller validates that the cluster can be safely downgraded to this version (override with --force)
    • FeatureLevelRecord is written to the metadata log
    • Metadata snapshot is generated and sent to the other inactive controllers and to brokers (this snapshot may be lossy!)
    • Components reload their state to recognize the new (old) version
  • Operator performs rolling restart of cluster with a new software version
  • Operator increases metadata.version feature flag using kafka-features.sh tool
    • UpdateFeaturesRequest is sent to the active controller
    • The controller validates that the cluster can be upgraded to this version
    • FeatureLevelRecord is written to the metadata log
    • Metadata snapshot is generated and sent to the other nodes
    • Components reload their state with new version

The downgrade workflow is similar:

  • Operator decreases metadata.version feature flag using kafka-features.sh tool
    • UpdateFeaturesRequest is sent to the active controller
    • The controller validates that the cluster can be safely downgraded to this version (override with --force)
    • FeatureLevelRecord is written to the metadata log
    • Metadata snapshot is generated and sent to the other inactive controllers and to brokers (this snapshot may be lossy!)
    • Components reload their state to recognize the new (old) version
  • Operator performs rolling restart of cluster with downgraded software version

New Feature Flag

We will introduce a new feature flag named metadata.version which takes over and expands on the role of inter.broker.protocol.version. This new feature flag will track changes to the metadata record format and RPCs. Whenever a new record or RPC is introduced, or an incompatible change is made to an existing record or RPC, we will increase this version. The metadata.version is free to increase many times between Kafka releases. This is similar to the IV (inter-version) versions of the IBP.

The metadata.version feature flag will be defined and configured using the facilities introduced by KIP-584 (feature versions). As brokers and controllers upgrade to new software, their maximum supported metadata.version will increase automatically. However, the “finalized” version that can be used by the cluster will only be increased by an operator once all the nodes have upgraded. In other words, the basic workflow of an upgrade is:

  • Rolling upgrade software of each node (broker and controller)
  • Online upgrade of metadata.version to the desired supported version

In the absence of an operator defined value for metadata.version, we cannot safely assume anything about which metadata.version to use. If we simply assumed the highest supported value, it could lead to unintended downgrades in the event that a broker with a lower supported version joined the cluster. To avoid this, and other upgrade complications, we will need to bootstrap metadata.version with some initial version.

  • downgraded software version

New Feature Flag

We will introduce a new feature flag named metadata.version which takes over and expands on the role of inter.broker.protocol.version. This new feature flag will track changes to the metadata record format and RPCs. Whenever a new record or RPC is introduced, or an incompatible change is made to an existing record or RPC, we will increase this version. The metadata.version is free to increase many times between Kafka releases. This is similar to the IV (inter-version) versions of the IBP.

The metadata.version feature flag will be defined and configured using the facilities introduced by KIP-584 (feature versions). As brokers and controllers upgrade to new software, their maximum supported metadata.version will increase automatically. However, the “finalized” version that can be used by the cluster will only be increased by an operator once all the nodes have upgraded. In other words, the basic workflow of an upgrade is:

  • Rolling upgrade software of each node (broker and controller)
  • Online upgrade of metadata.version to the desired supported version

In the absence of an operator defined value for metadata.version, we cannot safely assume anything about which metadata.version to use. If we simply assumed the highest supported value, it could lead to unintended downgrades in the event that a broker with a lower supported version joined the cluster. To avoid this, and other upgrade complications, we will need to bootstrap metadata.version with some initial version.

Initialization

When the quorum leader is starting up for the first time after this feature flag has been introduced, it will need a way to initialize the finalized version. After the leader finishes loading its state from disk, if has not encountered a FeatureLevelRecord, it will read an initial value for this feature from its local meta.properties file and generate a FeatureLevelRecord. We will extend the format sub-command of kafka-storage.sh to allow operators to specify which version is initialized. If no value has been specified by the operator, the tool will select the latest known value for that version of the software. 

Compatibility

It is possible that brokers and controllers attempt to join the cluster or quorum, but cannot support the current metadata.version. For brokers, this is already handled by the controller during registration. If a broker attempts to register with the controller, but the controller determines that the broker cannot support the current set of finalized features (which includes metadata.version), it will reject the registration request. For controllers, it is more complicated since we need to allow the quorum to be established in order to allow records to be exchanged and learn about the new metadata.version. A controller running old software will join the quorum and begin replicating the metadata log. If this inactive controller encounters a FeatureLevelRecord for metadata.version that it cannot support, it should terminate.

In the unlikely even that an active controller encounters an unsupported metadata.version, it should resign and terminate. 

If a broker encounters an unsupported metadata.version, it should unregister itself and terminateFor the first release that supports metadata.version, we can simply initialize metadata.version with the current (and only) version. For future releases, we will need a mechanism to bootstrap a particular version. This could be done using the meta.properties file or some similar mechanism. The reason we need the allow for a specific initial version is to support the use case of starting a Kafka cluster at version X with an older metadata.version.

Upgrades

KRaft upgrades are done in two steps with only a single rolling restart of the cluster required. After all the nodes of the cluster are running the new software version, they will continue using the previous version of RPCs and record formats. Only after increasing the metadata.version will these new RPCs and records be used. Since a software upgrade may span across multiple metadata.version versions, it should be possible to perform many online upgrades without restarting any nodes. This provides a mechanism for incrementally increasing metadata.version to try out new features introduced between the initial software version and the upgraded software version.

...