Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The  ApiVersionsRequest enables the client to ask a broker what APIs it supports. For each API that the broker enumerates in the response, it will also return a version range it supports. This response may be insufficient for cases, where, a client needs to commence usage of a specific feature version in the cluster, based on whether all brokers in the cluster support itthe feature at a specific version. Consider the following problem. Users could upgrade their client apps before or after upgrading their brokers. If it happens before the broker upgrade, any new features on the client apps that depend on certain broker-side support (from all brokers in the cluster) must stay disabled, until all the brokers have also been upgraded to the required version. The question then becomes, at what point in time can these new features be safely enabled on the client side? Today, this has to be decided manually by a human, and it is error-prone. Clients don’t have a way to programmatically learn what version of a certain feature is guaranteed to be supported by all brokers (i.e. cluster-wide finalized feature versions), and take suitable decisions.

...

  1. Client discovery: Provide an API to programmatically access the feature metadata. Here, the “metadata” is refers to the feature version level levels (i.e. the cluster-wide finalized maximum versions  and minimum versions of broker features). We would like to serve this metadata to clients in an eventually consistent and scalable way.

  2. Feature gating: Provide an API to safely, durably and dynamically finalize the upgrades to cluster-wide feature version levels. The API (and it’s related tooling) can be used by the cluster operator (i.e. typically a human) to finalize feature max version level upgrades/downgrades.

  3. IBP deprecation: As a good side-effect of the above, we would like to provide a path to eventually deprecate the need for the cumbersome inter.broker.protocol configuration, and the broker “double roll” mechanism. A very tangible benefit of the solution is that we should be able to do broker upgrades with just a single rolling restart.

...

  1. Client Discovery:

    • By scalable, we mean the solution should horizontally scale to the metadata read and write workload as the cluster grows with more features, brokers and clients. It is expected that metadata discovery is read heavy, while write traffic is very low (feature version levels are finalized typically during releases or other configuration pushes, which, can happen few times a day per cluster).

    • The metadata served to the client will contain an epoch value. These epoch values are integers and will increase monotonically whenever the metadata changes. At any given time, the latest epoch for the metadata returned to the client is valid, and the cluster functions in accordance with the same. Note: here, the metadata epoch # applies for the entire metadata contents, and is not particularly related to the individual feature version – these are 2 separate things. Please read this section for more information.

    • We want to be careful and specific about what we mean by consistent here. The metadata we serve to the client during discovery, will be eventually consistent. It turns out that scaling to strongly consistent metadata reads is not easy (in the current Kafka setup). And the cost of a solution that’s eventually consistent, can be made minimal, in the following way. Due to eventual consistency, there can be cases where an older lower epoch of the metadata is briefly returned during discovery, after a more recent higher epoch was returned at a previous point in time. We expect clients to always employ the rule that the latest received higher epoch of metadata always trumps an older smaller epoch. Those clients that are external to Kafka should strongly consider discovering the latest metadata once during startup from the brokers, and if required refresh the metadata periodically (to get the latest metadata).

  2. Feature gating:

    • It's only allowed to modify max feature version level, using the newly provided API. The min feature version level can not be modified with the new API.
    • By safe, we mean: when processing a request to finalize a set of feature versionsversion levels, the system will dynamically verify that all brokers in the cluster support the intended version. If not, the request will be rejected. Also, when a broker starts up, it will verify that it is compatible with the configured feature versions. If it is not, it will either refuse to start up or eventually die as soon as it discovers a feature incompatibility.

    • By durable, we mean that the finalized features should be persisted durably and remembered across broker restarts. Generally the system should tolerate faults, as much as it does for storage of other critical metadata (such as topic configuration).

    • By dynamic, we mean that the finalized features can be mutated in a cheap/easy way without compromising the uptime of broker/controller processes, or the health of the cluster.

Non-goals

Following are problems surrounding downgrades/deprecation, that we don’t intend to solve with this KIP:

...

Within the scope of this KIP, we provide only certain support related to feature downgrades and deprecation. These are described below:

  1. Downgrade of feature version level:
    A feature "downgrade" refers to dropping support across the entire cluster for a feature version level. This means reducing the finalized maximum feature version level X to a version level Y, where Y < X. In other words, dropping cluster-wide support for an existing feature that was already finalized at a newer version level. Firstly, we leave it to the cluster operator (i.e. human) to decide whether the

...

  1. above actions are backwards compatible

...

  1. . It is not within the scope of this KIP to provide help to the cluster operator to achieve this. After the cluster operator is past this step, we do provide the following support:

    1. Just like with upgrades, a downgrade request to reduce feature version level

...

    1. is rejected by the system, unless, all brokers support the downgraded

...

    1. versions of the feature. In the example above, the system expects all brokers to support the downgraded feature version Y.

    2. We assume that, downgrades of finalized max feature version levels, are rare. For safety reasons, we request for the human to specify an explicit "allow downgrade" flag (in the API/tool) to safeguard against easy accidental downgrades to version levels.
  1. Deprecation of feature version level:

    1. A need can arise to deprecate the usage of a certain version of one or more broker feature. A feature "deprecation" refers to increasing the finalized minimum feature version level X to a version level Y, where Y > X. We note that feature

...

    1. versions are typically deprecated during Kafka Broker releases. This is very unlike max feature version level upgrades, which can happen dynamically, after broker bits are deployed to a cluster.

    2. Firstly, the cluster operator (i.e. human) should use external means to establish that it is safe to stop supporting a particular version of broker feature. For example, verify (if needed) that no clients are actively using the version, before deciding to stop supporting it. It is not within the scope of this KIP to provide help to the cluster operator to achieve this.

Proposed changes

Below is a TL;DR of the changes:

...

The proposal is that cluster-wide finalized max/min feature version levels will be persisted in a specific common ZK node. The path to the ZK node is proposed as: '/features' . The node content type is JSON (string), and the size is expected to be typically small (in several KBs). Couple high level details:

...

Code Block
{
   "version": 0, // int64 -> Represents the version of the schema for the data stored in the ZK node   
   "features": {
       	"group_coordinator": {   // string -> name of the feature
  			"min_version_level": 0, // int64 -> Represents the cluster-wide finalized minimum version level (>=1) of this feature
           	"max_version_level": 3 // int64 -> Represents the cluster-wide finalized maximum version level (>=1 and >= min_version_level) of this feature
	    },
        "consumer_offsets_topic_schema": { 
      	    "min_version_level": 4
	0,
            }	"max_version_level": 4
	}
}

The schema is a JSON dictionary with few different keys and values, as explained below:

  • The value for the version key is an int64 that contains the version of the schema of the data stored in the ZK node.

  • The value for the features key is a dictionary that contains a mapping from feature names to their metadata (such as finalized version levels). It's a map{string → map{string → <string | number>}}

     <feature_name>
                   |-->  <metadata_key>
                                        |-->  <metadata_value>

    • Top-level key <feature_name> is a non-empty string that’s a feature name.

      • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature metadata.

      • <metadata_value> is either a string or a number – it's the value for the <metadata_key>.

      • Note: this nested dictionary would contain the keyfollowing keys:

        • 'min_version_level' whose value is an int64 representing the

        finalized
        • minimum finalized cluster-wide version level for the feature.

...

        • 'max_version_level' whose value is an int64 representing the maximum finalized cluster-wide version level for the feature.
        • the following rule always holds true: min_version_level >= 1 and max_version_level >= 1 and min_version_level <= max_version_level.

Controller: ZK node bootstrap with default values

Imagine a case where the '/features' ZK node is nonImagine a case where the '/features' ZK node is non-existent. In such a case, when the controller starts up, it would create the ZK node for the first time (this is a blocking write that needs to succeed for the controller to continue its startup sequence). The data used to create the node, shall will be a map of {feature_name → max{min_feature_version, max_feature_version}}. This is obtained by the controller service from the broker's supported features. This approach brings convenience to users bootstrapping a Kafka cluster for the first time (with a specific Kafka Broker release). The controller finalizes the default min/max feature version levels automatically.

...

We introduce 1 new Admin API that’s served only by the controller, and identified by the new API key: ApiKeys.UPDATE_FEATURES.
This  This API enables transactional application of a set of cluster-wide feature updates to the ZK '/features' node (i.e. either all provided FeatureUpdate are applied to ZK, or none):

  • The API request is a list of FeatureUpdate that need to be applied, as explained below:

    • Each item specifies the finalized feature to be added or updated or deleted, along with the new max feature version level value.

    • Feature Max feature version level downgrades are not a regular operation. Each item optionally can specify a an allowDowngrade flag, which can be used to allow version level downgrades (or deletions).
    • To add a new finalized feature version level, or update an existing one, the user must specify the version level starting from 1 (and increasing).
    • If a finalized feature needs to be permanently deleted, the user must specify any a max version level value < 1, and should also set the allowDowngrade flag.

  • The response contains an error code and an error message.

...

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
	  "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdate", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to features.", "fields": [
      {"name":  "AllowDowngrade", "type":  "bool", "versions":  "0+",
        "about": "When set to true, the feature version level is allowed to be downgraded/deleted."},
      {"name":  "Feature", "type":  "[]FeatureKey", "versions":  "0+",
        "about":  "The feature to be updated.",
        "fields":  [
          {"name": "Name", "type":  "string", "versions":  "0+",
            "about": "The name of the feature."},
          {"name":  "VersionLevelMaxVersionLevel", "type":  "int64", "versions":  "0+",
            "about": "The new cluster-wide finalized maximum version level for the feature. A value >= 1 indicates the new feature version levelis valid/regular. A value < 1, is special, and can be used to request the deletion of the feature."}
      ]}
    ]}
  ]
}

...

For any <feature_name>, the above API implementation guards against a change for the related entry in '/features' from {"max_version_level": X} to {"max_version_level": X’}, unless, it notices that each live broker in the deployment advertises {"maxVersion": Y >= X’} and {"minVersion": Z <= X’} in it’s BrokerIdZnode (for the same <feature_name>). A similar check is also applied to changes in the "min_version_level" for a feature.

  1. For Related with the above guarding checks:

    1. ClusterBy default, the API disallows cluster-wide feature version level downgrades and deletions are disallowed by default, unless . These are allowed only if the allowDowngrade flag is specified.
    2. If any broker does not contain a required feature, this is considered an incompatibility → such a case will fail the API request.

    3. If any broker contains an additional feature that’s not required → this is not considered an incompatibility.

  2. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

  3. Activating the effects of a feature version cluster-wide is left to the discretion of the logic implementing the feature (ex: can be done via dynamic broker config).

  4. Deprecating/eliminating the presence/effects of a specific feature version cluster-wide is left to the discretion of the logic backing the feature (ex: can be done via dynamic broker config). However, it may need further external verification to ensure no entity (ex: a consumer, or a broker) is actively using the feature (see Non-goals section).

...

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version, inclusive." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version, inclusive." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "VersionLevelMaxVersionLevel", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized max version level for the feature."},
        {"name":   ]
    }
"MinVersionLevel", "type": "int64", "versions":  "3+",
      // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}"about": "The cluster-wide finalized min version level for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

Broker protections against race conditions

...

Tooling support

We shall introduce a FeatureCommand CLI tool backed by a new admin command library. This CLI tool will be maintained in the Kafka repository, alongside the code for the Kafka Broker.

Note the following: The CLI tool internally has knowledge about a map of features to their respective max versions supported by the Broker. The tool's knowledge of features and their version values, is limited to the version of the CLI tool itself i.e. the information is packaged into the CLI tool when it is released. Whenever a Kafka release introduces a new feature version, or modifies an existing feature version, the CLI tool shall also be updated with this information. Newer versions of the CLI tool will be released as part of the Kafka releases.

The CLI tool will be used by the type called kafka.admin.FeatureCommand. The implementation will be inspired by the various command classes written in Scala that already enable us to carry out things such as CreateTopics, DeleteTopics, AlterConfigs etc. from CLI.  The new FeatureCommand will be used by the cluster operator (i.e. a human), and willenable us to do the following:

  1. Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.
  2. Add/update/delete specific or all cluster-wide finalized feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

...

Later below, we demonstrate regular as well as advanced usage of the CLI tool.

We We shall introduce 3 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades/downgrades. Below is Java-ish pseudocode for the same.

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * could return a suitable error.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

// ---- END: Proposed Admin API definitions ----

// Represents a cluster-wide finalized feature, with a feature version levels.
class FinalizedFeature {
	// The name of the feature.
	String name();

    // The cluster-wide finalized value of the feature min version level (value >= 1).
    long versionLevelminVersionLevel();
}

    // RepresentsThe acluster-wide featurefinalized thatvalue isof supportedthe byfeature amax broker,version withlevel a specific
// feature version range (value >=1 and value >= minVersionLevel).
    long maxVersionLevel();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
class SupportedFeature {
	// The name of the feature.
	String name();

	// The minimum version (value >= 1) of the supported feature.
	long minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	long maxVersion();
}

// Represents an update to a Feature, which can be sent to the controller
// for processing.
class FeatureUpdate {
	// The name  // Returnof the feature to be updated.
	String name();

    // The version returned via 'getFeature().getVersionLevel()': cluster-wide finalized NEW value of the feature max version level.
    // - When >= 1, it's the new value to-be-updated for the finalized feature.
	// - When < 1, it indicates the deletion of a finalized feature.
    FinalizedFeaturelong featuremaxVersionLevel();
    
    // Return true only if downgrade/deletion of a feature should be allowed.
    bool allowDowngrade();
}

// Represents a collection of feature metadata, along with the host:port
// of the broker serving the metadata.
class FeatureMetadata {
    // The set of cluster-wide finalized features, keyed by feature name.
	Set<FinalizedFeature> finalizedFeatures();

    // The monotonically increasing epoch for the finalized features.
    long epoch();

	// The set of features supported by a broker, keyed by feature name.
    Set<SupportedFeature> supportedFeatures();

	// The hostname of the broker.
	String host();

	// The port of the broker.
	int32 port();   
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the FeatureUpdate(s) that failed
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

...

Regular CLI tool usage

Following are examples of regular usage of a CLI tool built on top of FeatureCommand. It demonstrates the type of options supported, and their outputthe CLI tool, which involves the following activities:

  1. Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  2. Upgrade the max version levels of all features, to their latest values, as known to the CLI tool internally. This becomes useful after completing the deployment of a new Kafka Broker release onto an existing cluster. This removes the burden to individually finalize feature upgrades. 
  3. Downgrade the max version levels of all features, to the values known to the CLI tool internally. This becomes useful during a cluster downgrade.

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.

$> kafka-features.sh describe \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 1
        },
        "transaction_coordinator": {
  			"min_version_level": 1,
        	"max_version_level": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== ADD_OR_UPDATE UPGRADE TO ALL LATEST FEATURES ===

# Upgrade Addto orthe updatemax aversion listlevels of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--allow-downgrade` to allow a downgrade for feature version levels. This should be used only when requiredall features, as internally known to the CLI tool.
#
# This command removes the burden to individually finalize feature upgrades.
# This becomes handy to a cluster operator intending to finalize a cluster with all the latest
# available feature version levels. This usually happens after completing the deployment
# of a newer Kafka Broker release onto an existing cluster.

$> kafka-features.sh updatefinalize-all-latest \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --allow-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {


{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 23
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 56
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 13
        }
	},
	"finalized_features": {
        "epoch		"epoch": 3,
		"group_coordinator": 1,
        "group_coordinator{
			"min_version_level": {1,
            "max_version_level": 23
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            	"max_version_level": 36
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 13
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DOWNGRADE DELETEALL FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of finalized features to be deleted Downgrade to the max version levels of all features known to the CLI tool.
#
# This command removes the burden to individually finalize feature version
# downgrades. This becomes handy to a cluster operator intending to downgrade all
# feature version levels, just prior to rolling back a Kafka Broker deployment
# on a cluster, to a previous Broker release.

$> kafka-features.sh deletedowngrade-all \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 23
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 56
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 13
        }
	},
	"finalized_features": {
		"epoch": 23,
        "consumer_offsets_topic_schema": { 		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 13
        },
     },
   "hosttransaction_coordinator": "kafka-broker0.prn1",
{
			"min_version_level": 1,
            "portmax_version_level": 9071
}

=== FINALIZE LATEST FEATURES ===

# Finalize the latest (i.e. highest) version levels of all features known to the controller.
#
# This command removes the burden to individually finalize feature upgrades.
# This is useful to a cluster operator intending to finalize a cluster with all the latest
# available feature version levels. It becomes useful after completing the deployment of
# a new Kafka Broker release onto an existing cluster.
#
# The way the command works, is that it queries the controller to read the map of
# supported features (via ApiVersionsRequest). Then the command constructs a `Set<FeatureUpdate>`,
# containing all supported features, with their feature version levels set to the corresponding
# max_version (as returned in the ApiVersionsResponse). Finally, the controller makes an
# UPDATE_FEATURES api call passing to it the just constructed Set<FeatureUpdate>. This call
# should finalize all the latest available features6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

Advanced CLI tool usage

Following are examples of advanced usage of the CLI tool. Going beyond regular usage, advanced usage involves adding/updating/deleting specific cluster-wide finalized feature versions.

Code Block
=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--allow-downgrade` to allow a downgrade for feature version levels. This should be used only when required.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --allow-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 2
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of finalized features to be deleted.

$> kafka-features.sh finalize-latest delete \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --bootstrap-server kafka-broker0.prn1:9071features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 32
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 65
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 31
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
    2,
        "version_level": 3
        },
        "transaction_coordinatorconsumer_offsets_topic_schema": {
        			"min_version_level": 6
    1,
    },
        "consumer_offsets_topic_schema": { 
            "max_version_level": 31
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

...

  • One of these (the old/existing broker bits) would only advertise group_coordinator version  feature with max version 1 (shortly referred to as v1 below). v1 doesn’t contain the latest EOS semantic described in KIP-447.

  • The other (the new broker bits) would advertise group_coordinator feature v1-v2 i.e.  max version v1 as well as v2. v2 contains the EOS semantic described in KIP-447.

...