Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The value for the version key is an int64 that contains the version of the schema of the data stored in the ZK node.

  • The value for the features key is a dictionary that contains a mapping from feature names to their metadata (such as finalized version levels). It's a map{string → map{string → <string | number>}}

     <feature_name>
                   |-->  <metadata_key>
                                        |-->  <metadata_value>

    • Top-level key <feature_name> is a non-empty string that’s a feature name.

      • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature metadata.

      • <metadata_value> is either a string or a number – it's the value for the <metadata_key>.

      • Note: this nested dictionary would contain the key: 'version_level' whose value is an int64 representing the finalized cluster-wide version level for the feature.

New controller API

Controller: ZK node bootstrap with default values

Imagine a case where the We introduce 1 new Admin API that’s served only by the controller, and identified by the new API key: ApiKeys.UPDATE_FEATURES.
This API enables transactional application of a set of cluster-wide feature updates to the ZK '/features' node (i.e. either all provided FeatureUpdate are applied to ZK, or none):

  • The API request is a list of FeatureUpdate that need to be applied, as explained below:

    • Each item specifies the finalized feature to be added or updated or deleted, along with the new feature version level value.

    • Feature version level downgrades are not a regular operation. Each item optionally can specify a allowDowngrade flag, which can be used to allow version level downgrades (or deletions).
    • To add a new finalized feature version level, or update an existing one, the user must specify the version level starting from 1 (and increasing).

    • If a finalized feature needs to be permanently deleted, the user must specify any version level value < 1, and should also set the allowDowngrade flag.

  • The response contains an error code and an error message.

To help explain things better, below are the request and response definitions for the new API to update features (also see section showing related pseudocode for the Admin API):

UpdateFeaturesRequest schema

ZK node is non-existent. In such a case, when the controller starts up, it would create the ZK node for the first time (this is a blocking write that needs to succeed for the controller to continue its startup sequence). The data used to create the node, shall be a map of {feature_name → max_feature_version}. This is obtained by the controller service from the broker's supported features. This approach brings convenience to users bootstrapping a Kafka cluster for the first time (with a specific Kafka Broker release). The controller finalizes the default feature version levels automatically.

New controller API

We introduce 1 new Admin API that’s served only by the controller, and identified by the new API key: ApiKeys.UPDATE_FEATURES.
This API enables transactional application of a set of cluster-wide feature updates to the ZK '/features' node (i.e. either all provided FeatureUpdate are applied to ZK, or none):

  • The API request is a list of FeatureUpdate that need to be applied, as explained below:

    • Each item specifies the finalized feature to be added or updated or deleted, along with the new feature version level value.

    • Feature version level downgrades are not a regular operation. Each item optionally can specify a allowDowngrade flag, which can be used to allow version level downgrades (or deletions).
    • To add a new finalized feature version level, or update an existing one, the user must specify the version level starting from 1 (and increasing).

    • If a finalized feature needs to be permanently deleted, the user must specify any version level value < 1, and should also set the allowDowngrade flag.

  • The response contains an error code and an error message.

To help explain things better, below are the request and response definitions for the new API to update features (also see section showing related pseudocode for the Admin API):

UpdateFeaturesRequest schema


Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "
Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
	  "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdate", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to features.", "fields": [
     { {"name":  "AllowDowngradetimeoutMs", "type":  "boolint32", "versions":  "0+",
      "default": "60000",
	  "about": "WhenHow setlong to true,wait thein featuremilliseconds versionbefore leveltiming isout allowed to be downgraded/deletedthe request." },
     { {"name":  "FeatureFeatureUpdate", "type":  "[]FeatureKeyFeatureUpdateKey", "versions":  "0+",
        "about":  "The featurelist of updates to be updatedfeatures.",
        "fields":  [
          {"name":  "NameAllowDowngrade", "type":  "stringbool", "versions":  "0+",
            "about"about": "TheWhen set nameto oftrue, the feature version level is allowed to be downgraded/deleted."},
          {"name":  "VersionLevelFeature", "type":  "int64[]FeatureKey", "versions":  "0+",
            "about":  "The newfeature cluster-wideto finalized version level for the feature. A value >= 1 indicates the new feature version level. A value < 1, can be used to request the deletion of the feature."}
      ]}
    ]}
  ]
}

UpdateFeaturesResponse schema

be updated.",
        "fields":  [
          {"name": "Name", "type":  "string", "versions":  "0+",
            "about": "The name of the feature."},
          {"name":  "VersionLevel", "type":  "int64", "versions":  "0+",
            "about": "The new cluster-wide finalized version level for the feature. A value >= 1 indicates the new feature version level. A value < 1, can be used to request the deletion of the feature."}
      ]}
    ]}
  ]
}

UpdateFeaturesResponse schema


Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [	
Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [	
    // - If the request was processed by a broker that's not the controller,
    //   then this response will contain the existing NOT_CONTROLLER error code.
    // - If the request is being concurrently processed by the controller,
    //   then this response will contain the FEATURE_UPDATE_IN_PROGRESS
    //   error code (a new error code).
    // - If the request containedwas atprocessed leastby onea FeatureUpdatebroker that cannot's not the controller,
    //   be applied, then this response will contain the existing NOT_CONTROLLER error code FEATURE_UPDATES_FAILED.
    // - If (athe newrequest error code).
	{ "name": "ErrorCode", "type": "int16", "versions": "0+"is being concurrently processed by the controller,
    //  "about": "The error code, or 0 if there was no error." },
     then this response will contain the FEATURE_UPDATE_IN_PROGRESS
    //   error code (a new error code).
    // - If the request contained at least one FeatureUpdate that cannot
    //   be applied, then this response will contain the error code FEATURE_UPDATES_FAILED
    //   (a new error code).
	{ "name": "ErrorMessageErrorCode", "type": "stringint16", "versions": "0+",
      "about": "The error messagecode, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "about": "The error message, or null if there was no error." }
  ]
}

...

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version, inclusive." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version, inclusive." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "VersionLevel", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized version level for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

Broker protections against race conditions

Certain validations will be introduced at few points in the broker code. The purpose is to avoid race conditions where incompatible brokers remain active in a cluster. The validations affirm that the supported feature versions in the broker are compatible with the expected cluster-wide feature versions. If any of these checks fail, this would trigger the broker shutdown sequence, and the process eventually exits with a non-zero exit code. The places where the validation is going to be introduced, are explained below:

  1. Validation shall be introduced during broker startup. This involves synchronously reading the cluster-wide feature versions from the '/features' ZK node just after initializing the ZK client, and before creating the broker’s own ephemeral node (roughly here). The feature versions available in the broker are checked against the contents of the '/features' ZK node to ensure there are no incompatibilities. If an incompatibility is detected, the broker will be made to shutdown immediately.

  2. A watch is setup on '/features' ZK node. Then, the above validation will be reused in the code path that reads the contents of the '/features' ZK node whenever a watch fires. This affirms that the feature versions available in the broker always remain compatible with the cluster-wide feature versions read from ZK.

NOTE: The logic for the validations will be exactly the same as the one described under Validations section under Controller API.

Incompatible broker lifetime race condition

Description of a rare race condition:

  • T1: Imagine at time T1 the following event E1 occurs: A broker B starts up, passes feature validation, and registers it’s presence in ZK in its BrokerIdZnode, along with advertising it’s supported features. Assume that this is broker B is just about to become incompatible in it's feature set, in comparison to cluster-wide finalized feature versions.
  • T1: At the same time T1, the following event E2 occurs that's concurrent with E1: a feature version level upgrade is finalized in the controller which causes broker B to become incompatible in it's feature set.
  • T2: At a future time T2 the following event E3 occurs: The incompatible broker B receives the a ZK notification about a change to '/features' node. The broker validates the new contents of '/features' node against it’s supported features, finds an incompatibility and shuts down immediately.

Question: What if between E1 and E2, the broker B containing incompatible features lingers in the cluster? Would this cause a harm to the cluster?

Solution: This window is very small (milli seconds), and typically rare – it can only happen in a rare case where an incompatible broker comes up in the cluster around the time that a feature version upgrade is finalized. Here is how we will handle the race condition: In the controller, the thread that handles the ApiKeys.UPDATE_FEATURES request will be the ControllerEventThreadThis is also the same thread that updates the controller's cache of Broker info whenever a new broker joins the cluster. In this setup, if an ApiKeys.UPDATE_FEATURES request (E1) is processed ahead of a notification from ZK about an incompatible broker joining the cluster (E2), then the controller can certainly detect the incompatibility when it processes E2 after E1. The controller could block the remaining of the new broker startup sequence by refusing to send an UpdateMetadataRequest to bootstrap the new broker. Then, it is only a matter of time (milli seconds) before the new broker receives a ZK notification (E3) about a change to '/features' node, then automatically shuts itself down due to the incompatibility.

Tooling support

...

      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "VersionLevel", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized version level for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

Broker protections against race conditions

Certain validations will be introduced at few points in the broker code. The purpose is to avoid race conditions where incompatible brokers remain active in a cluster. The validations affirm that the supported feature versions in the broker are compatible with the expected cluster-wide feature versions. If any of these checks fail, this would trigger the broker shutdown sequence, and the process eventually exits with a non-zero exit code. The places where the validation is going to be introduced, are explained below:

  1. Validation shall be introduced during broker startup. This involves synchronously reading the cluster-wide feature versions from the '/features' ZK node just after initializing the ZK client, and before creating the broker’s own ephemeral node (roughly here). The feature versions available in the broker are checked against the contents of the '/features' ZK node to ensure there are no incompatibilities. If an incompatibility is detected, the broker will be made to shutdown immediately.

  2. A watch is setup on '/features' ZK node. Then, the above validation will be reused in the code path that reads the contents of the '/features' ZK node whenever a watch fires. This affirms that the feature versions available in the broker always remain compatible with the cluster-wide feature versions read from ZK.

NOTE: The logic for the validations will be exactly the same as the one described under Validations section under Controller API.

Incompatible broker lifetime race condition

Description of a rare race condition:

  • T1: Imagine at time T1 the following event E1 occurs: A broker B starts up, passes feature validation, and registers it’s presence in ZK in its BrokerIdZnode, along with advertising it’s supported features. Assume that this is broker B is just about to become incompatible in it's feature set, in comparison to cluster-wide finalized feature versions.
  • T1: At the same time T1, the following event E2 occurs that's concurrent with E1: a feature version level upgrade is finalized in the controller which causes broker B to become incompatible in it's feature set.
  • T2: At a future time T2 the following event E3 occurs: The incompatible broker B receives the a ZK notification about a change to '/features' node. The broker validates the new contents of '/features' node against it’s supported features, finds an incompatibility and shuts down immediately.

Question: What if between E1 and E2, the broker B containing incompatible features lingers in the cluster? Would this cause a harm to the cluster?

Solution: This window is very small (milli seconds), and typically rare – it can only happen in a rare case where an incompatible broker comes up in the cluster around the time that a feature version upgrade is finalized. Here is how we will handle the race condition: In the controller, the thread that handles the ApiKeys.UPDATE_FEATURES request will be the ControllerEventThreadThis is also the same thread that updates the controller's cache of Broker info whenever a new broker joins the cluster. In this setup, if an ApiKeys.UPDATE_FEATURES request (E1) is processed ahead of a notification from ZK about an incompatible broker joining the cluster (E2), then the controller can certainly detect the incompatibility when it processes E2 after E1. The controller could block the remaining of the new broker startup sequence by refusing to send an UpdateMetadataRequest to bootstrap the new broker. Then, it is only a matter of time (milli seconds) before the new broker receives a ZK notification (E3) about a change to '/features' node, then automatically shuts itself down due to the incompatibility.

Tooling support

We shall introduce a CLI tool backed by a new admin command type called kafka.admin.FeatureCommand. The implementation will be inspired by the various command classes written in Scala that already enable us to carry out things such as CreateTopics, DeleteTopics, AlterConfigs etc. from CLI.  The new FeatureCommand will be used by the cluster operator (i.e. a human), and willenable us to do the following:

  • Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  • Add/update/delete cluster-wide finalized feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

We shall introduce 3 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades. Below is Java-ish pseudocode for the same.

Admin API changes

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * could return a suitable error.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

// ---- END: Proposed Admin API definitions ----

// Represents a cluster-wide finalized feature, with a feature version.
class FinalizedFeature {
	// The name of the feature.
	String name();

    // The cluster-wide finalized value of the feature version level.
    long versionLevel();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
class SupportedFeature {
	// The name of the feature.
	String name();

	// The minimum version (value >= 1) of the supported feature.
	long minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	long maxVersion();
}

// Represents an update to a Feature, which can be sent to the controller
// for processing.
class FeatureUpdate {
    // Return the feature to be updated.
	// The version returned via 'getFeature().getVersionLevel()':
    // - When >= 1, it's the new value to-be-updated for the finalized feature.
	// - When < 1, it indicates the deletion of a finalized feature.
    FinalizedFeature feature();
    
    // Return true only if downgrade/deletion of a feature should be allowed.
    bool allowDowngrade();
}

// Represents a collection of feature metadata, along with the host:port
// of the broker serving the metadata.
class FeatureMetadata {
    // The set of cluster-wide finalized features, keyed by feature name.
	Set<FinalizedFeature> finalizedFeatures();

    // The monotonically increasing epoch for the finalized features.
    long epoch();

	// The set of features supported by a broker, keyed by feature name.
    Set<SupportedFeature> supportedFeatures();

	// The hostname of the broker.
	String host();

	// The port of the broker.
	int32 port();   
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the FeatureUpdate(s) that failed
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

Examples

Following are examples of a CLI tool built on top of FeatureCommand. It demonstrates the type of options supported, and their output.

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.

$> kafka-features.sh describe \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
            "version_level": 1
        },
        "transaction_coordinator": {
        	"version_level": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued
  • Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  • Add/update/delete cluster-wide finalized feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

We shall introduce 3 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades. Below is Java-ish pseudocode for the same.

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * could return a suitable error.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

// ---- END: Proposed Admin API definitions ----

// Represents a cluster-wide finalized feature, with a feature version.
class FinalizedFeature {
	// The name of the feature.
	String name();

    // The cluster-wide finalized value of the feature version level.
    long versionLevel();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
class SupportedFeature {
	// The name of the feature.
	String name();

	// The minimum version (value >= 1) of the supported feature.
	long minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	long maxVersion();
}

// Represents an update to a Feature, which can be sent to the controller
// for processing.
class FeatureUpdate {
    // Return the feature to be updated.
	// The version returned via 'getFeature().getVersionLevel()':
    // - When >= 1, it's the new value to-be-updated for the finalized feature.
	// - When < 1, it indicates the deletion of a finalized feature.
    FinalizedFeature feature();
    
    // Return true only if downgrade/deletion of a feature should be allowed.
    bool allowDowngrade();
}

// Represents a collection of feature metadata, along with the host:port
// of the broker serving the metadata.
class FeatureMetadata {
    // The set of cluster-wide finalized features, keyed by feature name.
	Set<FinalizedFeature> finalizedFeatures();

    // The monotonically increasing epoch for the finalized features.
    long epoch();

	// The set of features supported by a broker, keyed by feature name.
    Set<SupportedFeature> supportedFeatures();

	// The hostname of the broker.
	String host();

	// The port of the broker.
	int32 port();   
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the FeatureUpdate(s) that failed
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

Examples

Following are examples of a CLI tool built on top of FeatureCommand. It demonstrates the type of options supported, and their output.

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server`upgrade` to provide a broker host:port to which queries should be issued comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Optionally, provideUse `--controller` flag directing the tool allow-downgrade` to issueallow thea querydowngrade tofor the
#feature version levels. This controllershould (whilebe discoveringused theonly controller via the bootstrap server).
#when required.

$> kafka-features.sh update \
     This can be useful for debugging purposes.

$> kafka-features.sh describe--bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --bootstrapallow-server kafka-broker0.prn1:9071downgrade transaction_coordinator:3 \

Please confirm before downgrading the [--controller]following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {,
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
            "version_level": 2
        "epoch": 0},
        "grouptransaction_coordinator": {
            	"version_level": 13
        },
        "transaction_coordinatorconsumer_offsets_topic_schema": { 
          	  "version_level": 41
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== ADD_OR_UPDATEDELETE FEATURES ===

# Add orDelete update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATEdelete request is issued.
#  - Use `--upgrade`features` to provide a comma-separated list of features and new finalized max versionfeatures to ADD_OR_UPDATE.
#  - Use `--allow-downgrade` to allow a downgrade for feature version levels. This should be used only when requiredbe deleted.

$> kafka-features.sh updatedelete \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgradefeatures group_coordinator:2,consumer_offsets_topic_schema:1 \
     --allow-downgrade transaction_coordinator:3 \

Please confirm beforedeletion downgradingof the following finalized features:
1. group_coordinator
2. transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        		"epoch": 12,
        "group_coordinatorconsumer_offsets_topic_schema": { 
            "version_level": 21
        }
   },
     "host": "kafka-broker0.prn1",
   "transaction_coordinator": {
        	"version_level": 3
        },
        "consumer_offsets_topic_schema": { 
            "version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of finalized features to be deletedport": 9071
}

=== ENABLE LATEST FEATURES ===

# Enable all the latest available features, as known to the controller.
#
# This command is useful to a cluster operator intending to enable all the latest
# available features, after completing the deployment of a new Kafka Broker release
# onto an existing cluster.
#
# The way the command works, is that it queries the controller to read the map of
# supported features (via ApiVersionsRequest). Then the command constructs a `Set<FeatureUpdate>`,
# containing all supported features, with their feature version levels set to the corresponding
# max_version (as returned in the ApiVersionsResponse). Finally, the controller makes an
# UPDATE_FEATURES api call passing to it the just constructed Set<FeatureUpdate>. This call
# should enable all the latest available features.

$> kafka-features.sh deleteenable-latest \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": { 
            "version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

...