Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Sometimes there can be a need to downgrade the cluster-wide availability of one or more broker features from some finalized version max version X to a version Y, where Y < X. Firstly, we leave it to the cluster operator (i.e. human) to decide whether the downgrade is backwards compatible in the first place i.e. externally ensure the changes introduced/exercised in version Y are no longer relevant in the system ahead of the downgrade. As for the support we provide:

    1. The rules are the same as with upgrades → such a downgrade request is rejected by the system, unless all brokers support the downgraded version Y of the feature.

    2. We assume downgrades of finalized feature max versions are rare. For safety reasons, we request for the human to specify an explicit "force downgrade" flag (in the API/tool) to avoid accidental downgrades to finalized max versions.
  2. A need can arise to deprecate the usage of a certain version of one or more broker feature. We do not provide any special support for this. Instead, the cluster operator (i.e. human) should use external means to establish that it is safe to stop supporting a particular version of broker feature. For example, verify (if needed) that no clients are actively using the version, before deciding to stop supporting it.

...

  • Each broker will advertise the version range of it’s features in it’s own BrokerIdZnode during startup. The contents of the advertisement are specific to the broker binary, but the schema is common to all brokers.

  • The controller already watches BrokerIdZnode updates, and will serve as a gateway (via a newly introduced API) to finalize version upgrades to features – controller is well positioned to serve this requirement since it has a global view of all BrokerIdZnode in the cluster, and can avoid most race conditions in the solution.

  • As for the metadata about cluster-wide finalized maximum versions of features (also referred to as feature version level), this information will be persisted in ZK by the controller. The controller will be the one and only entity modifying this information (we expect metadata write traffic to be very low). All brokers in the cluster setup watches on the ZK node containing the finalized features information, thereby they get notified whenever the contents of the node are change via the controller.

  • Finally, client discovery about finalized feature versions will be performed via ApiKeys.API_VERSIONS API – the response will be modified to contain the necessary information (served from broker cache). Using this decentralized approach, we scale for metadata reads, albeit with eventually consistent results.

...

Code Block
{ 
   "listener_security_protocol_map":{ 
      "INTERNAL":"PLAINTEXT",
      "REPLICATION":"PLAINTEXT",
      "EXTERNAL":"SASL_SSL"
   },
   "endpoints":[ 
      "INTERNAL://kafka-0.kafka.def.cluster.local:9071",
      "REPLICATION://kafka-0.kafka.abc.cluster.local:9072",
      "EXTERNAL://b3-kfc-mnope.us.clusterx:9092"
   ],
   "rack":"0",
   "jmx_port":7203,
   "host":"kafka-0.kafka.def.cluster.local",
   "timestamp":"1579041366882",
   "port":9071,
   // ----- START: PROPOSED ADDITIONAL/MODIFIED METADATA -----
   "version":5,  // existing key whose value has been bumped by 1
   "features": {  // new key
      "exactly_once_semantics": {  // string -> name of the feature
          "minVersionmin_version": 0,   // int64 -> represents the min supported version of this feature
          "maxVersionmax_version": 3  // int64 -> represents the max supported version of this feature
      },
      "consumer_offsets_topic_schema": { 
          "minVersionmin_version": 1,
          "maxVersionmax_version": 4
      }
   }
   // ----- END: PROPOSED ADDITIONAL METADATA -----
}

...

Code Block
{
   "version": 0, // int64 -> Represents the version of the schema for the data stored in the ZK node
   "data": {
		"features": {
        	"exactly_once_semantics": {   // string -> name of the feature
            	"version_level": 3 // int64 -> Represents the cluster-wide finalized max version of this feature
	        },
    	    "consumer_offsets_topic_schema": { 
        	    "version_level": 4
	        }
		}
   }
}

The schema is a JSON dictionary with few different keys and values, as explained below:

  • The value for the version key is an int64 that contains the version of the schema of the data stored in the ZK node.

  • The value for the data key is a dictionary that contains the following:
    • A key called features whose value is a dictionary. This contains a mapping from feature names to their metadata (such as finalized versions). It's a map{string → map{string → <string | number>}}

      <feature_name>
                     |-->  <metadata_key>
                                          |-->  <metadata_value>

      • Top-level key <feature_name> is a non-empty string that’s a feature name.

        • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature metadata.

        • <metadata_value> is either a string or a number – it's the value for the <metadata_key>.

        • Note: this nested dictionary would contain the key: 'version_level' whose value is an int64 representing the finalized cluster-wide max version for the feature.

New controller

...

API

We introduce 2 1 new Admin APIsAPI that’s served only by the controller, and identified by the new API keyskey: ApiKeys.UPDATE_FEATURESand ApiKeys.DELETE_FEATURES. .
This API enables safe transactional application of a set of cluster-wide feature updates :

...

to the ZK '/features' node (i.e. either all provided FeatureUpdate are applied to ZK, or none):

  • The

...

  • API request is a list of FeatureUpdate that need to be applied. Each item specifies an operation (as a FeatureUpdateType) such as:

...

    • FeatureUpdateType.ADD_OR_UPDATE: Adds or updates a new Feature with a finalized max version. Addition of a feature can happen when the cluster-wide max feature version is being finalized for the first time for a feature.

...

Optionally also enable downgrades as part of a certain FeatureUpdate.

...

    • FeatureUpdateType.ADD_OR_UPDATE_ALLOW_DOWNGRADE: Same as above, but also allows for feature version downgrades.

    • FeatureUpdateType.DELETE: Deletes a specified feature. The version number provided is ignored for deletions

ApiKeys.DELETE_FEATURES:

...

    • .
  • The response contains an error code and an error message.

To help explain things better, here are the request and response definitions for the new API to update features (also see section showing related pseudocode for the Admin API):

UpdateFeaturesRequest schema

...

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
	  "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdate", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to features.", "fields": [
      {"name":  "FeatureUpdateType", "type":  "int16", "versions":  "0+",
        "about": "The type of feature update (ex: ADD_OR_UPDATE, ADD_OR_UPDATE_WITH_DOWNGRADE, DELETE etc.)."},
      {"name":  "Feature", "type":  "[]FeatureKey", "versions":  "0+",
        "about":  "The feature to be updated.",
        "fields":  [
          {"name": "Name", "type":  "string", "versions":  "0+",
            "about": "The name of the feature."},
          {"name":  "VersionVersionLevel", "type":  "int64", "versions":  "0+",
            "about": "The new cluster-wide finalized max version for the feature."}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [	
    // - If the request was processed by a broker that's not the controller,
    //   then this response will contain the existing NOT_CONTROLLER error code.
    // - If the request is being concurrently processed by the controller,
    //   then this response will contain the FEATURE_UPDATE_IN_PROGRESS
    //   error code (a new error code).
    // - If the request contained at least one FeatureUpdate that cannot
    //   be applied, then this response will contain the error code FEATURE_UPDATEUPDATES_FAILED
    //   (a new error code) for the first such FeatureUpdate that failed.
	{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "about": "The error message, or null if there was no error." }
  ]
}

...

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.UPDATE_FEATURES API call at any given time.

  2. Prior to mutating '/features' in ZK, the implementation verifies that all broker advertisements (as seen by it thus far) contained no incompatibilities with the provided List<FeatureUpdate> (see Validations section below). Note that FeatureUpdateType.DELETE should only be used in circumstances where a finalized feature needs to be garbage collected. This type mutates '/features' in ZK without any guarding checks on feature versions.

  3. Once validations in #2 are successful, the List<FeatureUpdate> are applied to contents of '/features' ZK node. Upon successful write, ZK should automatically increment the version of the node by 1.

  4. Finally, an UpdateFeatureResponse is returned to the user.

...

  1. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of List<FeatureUpdate> may have either succeeded or failed.

Next, below are the request and response definitions for the new API to delete features:

DeleteFeaturesRequest schema

Code Block
{
  "apiKey": 49,
  "type": "request",
  "name": "DeleteFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
	  "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "Feature", "type": "[]FeatureKey", "versions": "0+",
      "about": "The list of features to be deleted.", "fields": [      
       {"name": "Name", "type":  "string", "versions":  "0+",
        "about": "The name of the feature."}
      ]
    }
  ]
}

DeleteFeaturesResponse schema

Code Block
{
  "apiKey": 49,
  "type": "response",
  "name": "DeleteFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    // - If the request was processed by a broker that's not the controller, then
    //   this response will contain the existing NOT_CONTROLLER error code.
    // - If the request contained a feature that was never present, then this
    //   response will contain the UNKNOWN_FEATURE error code (a newly added
    //    error code).
	{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "about": "The error message, or null if there was no error." }
  ]
}

...

Client discovery of finalized feature versions

The latest

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.DELETE_FEATURES API call at any given time.

  2. This API should only be used in circumstances where a finalized feature needs to be garbage collected.
  3. Unlike the ApiKeys.DELETE_FEATURES API, this API mutates '/features' in ZK without any guarding checks on feature versions. Upon successful write, ZK should automatically increment the version of the node by 1.

Guarantees

When a call to the ApiKeys.DELETE_FEATURES API returns a success return code to the client, the following is guaranteed:

  1. Each feature provided in the request was valid, and all provided features were removed from the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch.

When a call to the ApiKeys.DELETE_FEATURES API has failed on the client side, the following is guaranteed:

  1. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of the deletions may have either succeeded or failed.

Client discovery of finalized feature versions

The latest list of cluster-wide finalized feature versions will be served via the existing ApiKeys.API_VERSIONS API. This is used for client discovery, as well as for debuggability of the system. We will introduce a tagged optional field in the schema of the ApiVersionsResponse to accommodate the information. Couple implementation details:

  • Whenever ZK notifies the broker about a change to '/features' node, the broker reads the latest list of finalized features from ZK and caches it in memory (also see Broker protections section in this doc).

  • We will modify the implementation of the ApiKeys.API_VERSIONS API to populate the information about finalized features in the response. This will contain the cluster-wide finalized versions from the broker’s internal ZK cache of '/features' node contents (as explained above), as well as the broker-level supported versions.

  • The finalized feature versions populated in the response will be eventually consistent, since it is refreshed in the brokers via ZK watches.

...

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version, inclusive." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version, inclusive." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "VersionVersionLevel", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized max version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

...

  1. Validation shall be introduced during broker startup. This involves synchronously reading the cluster-wide feature versions from the '/features' ZK node just after initializing the ZK client, and before creating the broker’s own ephemeral node (roughly here). The feature versions available in the broker are checked against the contents of the node to ensure there are no incompatibilities. If an incompatibility is detected, the broker will be made to shutdown immediately.

  2. A watch is setup on '/features' ZK node. Then, the above validation will be reused in the code path that reads the contents of the '/features' ZK node whenever a watch fires. This affirms that the feature versions available in the broker always remain compatible with the cluster-wide feature versions read from ZK.

NOTE: The logic for the validations will be exactly the same as the one described under Validations section under Controller API.

Incompatible broker lifetime race condition

One of the expectations about broker behavior, which is already followed in the broker, is the following.Description of a rare race condition:

  • Imagine at time T1 the a broker B starts up, passes feature validation, and registers it’s presence in ZK in its BrokerIdZnode, along with advertising it’s supported features. Assume that this is broker B is about to become incompatible in it's feature set, in comparison to cluster-wide finalized feature versions.
  • Concurrent to T1, feature version upgrade is finalized in the controller which causes broker B to become incompatible in it's feature set.
  • Imagine at a future time T2 the broker receives the a ZK notification about a change to '/features' node. The broker validates the new contents of '/features' node against it’s supported features to make sure there is no mismatch (it will shutdown if there is an incompatibility)features, finds an incompatibility and shuts down immediately.

Question: What if between T1 and T2, the broker contains B containing incompatible features lingers in the cluster? would Would this cause a harm to the cluster?

Answer: The current behavior of the broker (which we intend to preserve) is that in between the 2 events T1 and T2, the broker is almost a silent entity in the cluster. It does not add any value to the cluster, or carry out any important broker activities. By “important”, we mean it is not doing mutations on it’s persistence, not mutating critical in-memory state, won’t be serving produce/fetch requests. Note it doesn’t even know it’s assigned partitions until it receives UpdateMetadataRequest from the controller. It is important to note that anything the broker is doing up until this point is not damaging/useful This window is really very small (milli seconds), and can only happen in a very rare case where an incompatible broker comes up while a write is inflight to ZK to finalize feature version upgrade.

Tooling support

We shall introduce a CLI tool backed by a new admin command type called kafka.admin.FeatureCommand. The implementation will be inspired by the various command classes written in Scala that already enable us to carry out things such as CreateTopics, DeleteTopics, AlterConfigs etc. from CLI.  The new FeatureCommand will be used by the cluster operator (i.e. a human), and willenable us to do the following:

  • Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  • Add/update cluster-wide finalized feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

  • Delete cluster-wide finalized features by exercising the newly introduced ApiKeys.DELETE_FEATURES API on a controller.

We shall introduce 3 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades. Below is Java-ish pseudocode for the same.

...

FeatureCommand will be used by the cluster operator (i.e. a human), and willenable us to do the following:

  • Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  • Add/update cluster-wide finalized feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

  • Delete cluster-wide finalized features by exercising the newly introduced ApiKeys.DELETE_FEATURES API on a controller.

We shall introduce 3 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades. Below is Java-ish pseudocode for the same.

Admin API changes

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * could return a suitable error.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

/**
 * DeleteReturn the specified following:
 * 1. List of cluster-wide finalized features. feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DeleteFeaturesResultDescribeFeaturesResult. For example,
 * if feature deletion was in progress already, the controller
 * would return a suitable exception.
 /
DescribeFeaturesResult describeFeatures();

/**
 * @param features   set of feature names to be deleted.
 * @return           the result of the deleteFeatures request
 */
DeleteFeaturesResult deleteFeatures(Set<String> features);

// ---- END: Proposed Admin API definitions ----

interface FeatureBase {
	// The name of the feature.
	String getName();
}

// Represents a cluster-wide finalized feature, with a feature version.
interface FinalizedFeature extends FeatureBase {
    // The cluster-wide finalized value of the feature max version.
    long getVersion();
}

enum FeatureUpdateType {
    // Either add the feature (if not present), or update it (if present).
    ADD_OR_UPDATE,

	// Same as ADD_OR_UPDATE, but with downgrade allowed.
	ADD_OR_UPDATE_ALLOW_DOWNGRADE
}

interface FeatureUpdate {
    // Return the feature to be updated.
	// The version returned via 'getFeature().getVersion()', is the new value to-be-updated.
    FinalizedFeature getFeature();
    Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * could return a suitable error.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

// ---- END: Proposed Admin API definitions ----

// Represents a cluster-wide finalized feature, with a feature version.
class FinalizedFeature {
	// The name of the feature.
	String name();

    // Return the typeThe cluster-wide finalized value of updatethe tofeature bemax madeversion.
    FeatureUpdateTypelong getUpdateTypeversionLevel();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
interfaceclass SupportedFeature extends FeatureBase { {
	// The name of the feature.
	String name();

	// The minimum version (value >= 0) of the supported feature.
	long getMinVersion();

	// The maximum version (value >=0 and value >= minVersion) of the supported feature.
	long getMaxVersion();
}

interfaceenum FeatureMetadataFeatureUpdateType {
    // TheEither setadd ofthe cluster-wide finalized features.
	Set<FinalizedFeature> finalizedFeatures;

    // The monotonically increasing epoch for the finalized featuresfeature (if not present), or update it (if present).
    long getEpoch();ADD_OR_UPDATE,

	// The set of features supported by a broker.
    Set<SupportedFeature> supportedFeatures;

	// The hostname of the broker.
	String host;Same as ADD_OR_UPDATE, but with downgrade allowed.
	ADD_OR_UPDATE_ALLOW_DOWNGRADE

	// The port ofDelete the brokerfeature.
	int32 port;   DELETE
}

interfaceclass DescribeFeaturesResultFeatureUpdate {
    //**
 Return the feature to  *be updated.
	// The dataversion returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of via 'getFeature().getVersionLevel()', is the new value to-be-updated.
	// For deletions, the provided version level is ignored.
     * features supported by the broker serving this read request.
     */FinalizedFeature getFeature();
    
    // Return the type of update to be made.
    KafkaFuture<FeatureMetadata>FeatureUpdateType allgetUpdateType();
}

interfaceclass UpdateFeaturesResultFeatureMetadata {
    /**
/ The set of cluster-wide *finalized Returnsfeatures, akeyed futureby which indicates success/failure.feature name.
	Set<FinalizedFeature> finalizedFeatures;

     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data// The monotonically increasing epoch for the finalized features.
    long getEpoch();

	// The set of features supported by a broker, keyed by feature name.
    Set<SupportedFeature> supportedFeatures;

	// The hostname of the broker.
	String host;

	// The port of the broker.
	int32 port;   
}

class DescribeFeaturesResult {
    /**
     * The data  returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

class UpdateFeaturesResult {
    /**
     *. Returns a servingfuture thiswhich writeindicates requestsuccess/failure.
     * 21. If the future has failed, the top level error (if any)succeeded (i.e. no exceptions),
     *    orthen the errorrequest fromwas the100% firstsuccessful, failedand FeatureUpdate
no top level or
  *   * (if any) is raised to the caller.
     */individual FeatureUpdate errors were seen. The data
    KafkaFuture<FeatureMetadata> all();
}

interface DeleteFeaturesResult {
	/**
     * Returns a future which indicates success/failure. *    returned in the future contains the latest entire set of
     * 1. If the futurefinalized hascluster-wide succeededfeatures (i.e. no exceptionsafter all updates were applied),
     *    as well thenas the requestentire wasset 100%of successful,features andsupported noby topthe level orcontroller
     *.   serving individualthis feature deletion errors were seenwrite request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the firstFeatureUpdate(s) that failed deletion
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

...

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.

$> kafka-features.sh describe \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersionmin_version": 1,
            "maxVersionmax_version": 2
        },
        "transaction_coordinator": {
        	"minVersionmin_version": 0,
        	"maxVersionmax_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersionmin_version": 0,
        	"maxVersionmax_version": 0
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
            "version_level": 1
        },
        "transaction_coordinator": {
        	"version_level": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--force-downgrade` to provide a comma-separated list of features and new finalized max version, with a downgrade allowed for feature versions. This should be used only when required.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,my_new_feature:0 \
     --force-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersionmin_version": 1,
            "maxVersionmax_version": 2
        },
        "transaction_coordinator": {
        	"minVersionmin_version": 0,
        	"maxVersionmax_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersionmin_version": 0,
        	"maxVersionmax_version": 0
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
            "version_level": 2
        },
        "transaction_coordinator": {
        	"version_level": 3
        },
        "consumer_offsets_topic_schema": { 
            "version_level": 0
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of finalized features to be deleted.

$> kafka-features.sh delete \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersionmin_version": 1,
            "maxVersionmax_version": 2
        },
        "transaction_coordinator": {
        	"minVersionmin_version": 0,
        	"maxVersionmax_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersionmin_version": 0,
        	"maxVersionmax_version": 0
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": { 
            "version_level": 0
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

...

Summary of changes:

  1. We introduce 2 1 new APIs API in the broker RPC interface (see this section). These APIs can only be served by the controller, and these are This is the new ApiKeys.UPDATE_FEATURES (schema) and ApiKeys.DELETE_FEATURES (schema).that can only be served successfully by the controller.
  2. We introduce few optional fields in the ApiVersionsResponse containing the cluster-wide finalized feature metadata, feature metadata epoch, and the broker's supported features (see this section).
  3. We introduce 3 2 new APIs in the Admin interface to describe/addOrUpdate/delete features (see this section). Underneath covers, these exercise the APIs mentioned above.

...