Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Addressing review comments from Boyang and Jun

...

  1. Client Discovery:

    • By scalable, we mean the solution should horizontally scale to the metadata read and write workload as the cluster grows with more features, brokers and clients. It is expected that metadata discovery is read heavy, while write traffic is very low (max feature versions are finalized typically during releases or other configuration pushes, which, can happen few times a day per cluster).

    • The metadata served to the client will contain an epoch value. These epoch values are integers and will increase monotonically whenever the metadata changes. At any given time, the latest epoch for the metadata returned to the client is valid, and the cluster functions in accordance with the same. Note: here, the metadata epoch # applies for the entire metadata contents, and is not particularly related to the individual feature version – these are 2 separate things. Please read this section for more information.

    • We want to be careful and specific about what we mean by consistent here. The metadata we serve to the client during discovery, will be eventually consistent. It turns out that scaling to strongly consistent metadata reads is not easy (in the current Kafka setup). And the cost of a solution that’s eventually consistent, can be made minimal, in the following way. Due to eventual consistency, there can be cases where an older lower epoch of the metadata is briefly returned during discovery, after a more recent higher epoch was returned at a previous point in time. We expect clients to tolerate staleness by no more than a difference of 1 metadata epoch bump, and always accept always employ the rule that the latest received metadata epoch to trump higher epoch of metadata always trumps an older smaller epoch. The client Those clients that are external to Kafka should strongly consider discovering the latest metadata once during startup from the brokers, and if required refresh the metadata periodically (to get the latest metadata).

  2. Feature gating:

    • By safe, we mean: when processing a request to finalize a set of feature versions, the system will dynamically verify that all brokers in the cluster support the intended version. If not, the request will be rejected. Also, when a broker starts up, it will verify that it is compatible with the configured feature versions. If it is not, it will either refuse to start up or eventually die before it becomes an active member of the cluster (by active, we mean the point when the broker starts serving API requests).

    • By durable, we mean that the finalized features should be persisted durably and remembered across broker restarts. Generally the system should tolerate faults, as much as it does for storage of other critical metadata (such as topic configuration).

    • By dynamic, we mean that the finalized features can be mutated in a cheap/easy way without compromising the uptime of broker/controller processes, or the health of the cluster.

...

  • Each broker will advertise the version range of it’s features in it’s own BrokerIdZnode during startup. The contents of the advertisement are specific to the broker binary, but the schema is common to all brokers.

  • The controller already watches BrokerIdZnode updates, and will serve as a gateway (via a newly introduced API) to finalize version upgrades to features – controller is well positioned to serve this requirement since it has a global view of all BrokerIdZnode in the cluster, and can avoid most race conditions in the solution.

  • As for the metadata about cluster-wide finalized maximum versions of features, this information will be persisted in ZK by the controller. The controller will be the one and only entity modifying this information (we expect metadata write traffic to be very low). It is also responsible for fanning out the ZK updates to all All brokers in the cluster setup watches on the ZK node containing the finalized features information, thereby they get notified whenever the contents of the node are change via the controller.

  • Finally, client discovery about finalized feature versions will be performed via ApiKeys.API_VERSIONS API – the response will be modified to contain the necessary information (served from broker cache). Using this decentralized approach, we scale for metadata reads, albeit with eventually consistent results.

...

Code Block
{ 
   "listener_security_protocol_map":{ 
      "INTERNAL":"PLAINTEXT",
      "REPLICATION":"PLAINTEXT",
      "EXTERNAL":"SASL_SSL"
   },
   "endpoints":[ 
      "INTERNAL://kafka-0.kafka.def.cluster.local:9071",
      "REPLICATION://kafka-0.kafka.abc.cluster.local:9072",
      "EXTERNAL://b3-kfc-mnope.us.clusterx:9092"
   ],
   "rack":"0",
   "jmx_port":7203,
   "host":"kafka-0.kafka.def.cluster.local",
   "timestamp":"1579041366882",
   "port":9071,
   "version":45,
   // ----- START: PROPOSED ADDITIONAL METADATA -----
   "features": { 
      "exactly_once_semantics": {  // string -> name of the feature
          "minVersion": 0,   // int64 -> represents the min supported version of this feature
          "maxVersion": 3  // int64 -> represents the max supported version of this feature
      },
      "consumer_offsets_topic_schema": { 
          "minVersion": 1,
          "maxVersion": 4
      }
   }
   // ----- END: PROPOSED ADDITIONAL METADATA -----
}

...

Code Block
{
   "version": 0, // int64 -> Represents the version of the schema for the data stored in the ZK node
   "data": {
		"features": {
        	"epochexactly_once_semantics": {  0, // int64string -> name Representsof the versionfeature
 of the contents of the "data" dictionary.
		"features": {
        	"exactly_once_semantics": {   // string -> name of the feature
            	"version": 3 // int64 -> Represents the cluster-wide finalized max version of this feature
	        },
    	    "consumer_offsets_topic_schema": { 
        	    "version": 4
	        }
		}
   }
}

...

  • The value for the version key is an int64 that contains the version of the schema of the data stored in the ZK node.

  • The value for the data key is a dictionary that contains the following:
    • A key called epoch whose value is an int64. This represents the version of the contents of the data dictionary. For example, this gets bumped whenever the controller modifies the features via the new API (see this section).A key called features whose value is a dictionary. This contains a mapping from feature names to their metadata (such as finalized versions). It's a map{string → map{string → <string | number>}}

      <feature_name>
                     |-->  <metadata_key>
                                          |-->  <metadata_value>

      • Top-level key <feature_name> is a non-empty string that’s a feature name.

        • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature metadata.

        • <metadata_value> is either a string or a number – it's the value for the <metadata_key>.

        • Note: this nested dictionary would contain the key: 'version' whose value is an int64 representing the finalized cluster-wide max version for the feature.

New controller API

We introduce a 2 new Admin API that’s APIs that’s served only by the controller, and identified by the new API keykeys: ApiKeys.UPDATE_FEATURES and ApiKeys.DELETE_FEATURES. This API enables safe application of a set of cluster-wide feature updates:

  • ApiKeys.UPDATE_FEATURES:

    • The

    The
    • input to the API is a list of FeatureUpdate that need to be applied. Each item specifies an operation (as a FeatureUpdateType) such as:

      • Adding or updating a new Feature with a finalized max version.

       This happens
      • Addition of a feature can happen happens when the cluster-wide max feature version is being finalized for the first time for a feature.

    • Updating the max finalized feature version of an existing Feature.

    • Deleting an existing Feature.

      • Optionally also enable downgrades as part of a certain FeatureUpdate.

    • The response contains an error code and an error message.
  • ApiKeys.DELETE_FEATURES:

    • The input to the API is a list of features that need to be deleted.
    • The response contains an error code and an error message
    The response (during success) is a list of latest finalized Feature, with an epoch number (monotonically increasing)
    • .

To help explain things better, here are the request and response definitions for the new API to update features (also see section showing related pseudocode for the Admin API):

...

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "FeatureUpdatetimeoutMs", "type": "[]FeatureUpdateKeyint32", "versions": "10+",
   "default": "60000",
	   "about": "TheHow listlong of to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdate", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to features.", "fields": [
      {"name":  "FeatureUpdateType", "type":  "int16", "versions":  "10+",
        "about": "The type of feature update (ex: ADD_OR_UPDATE, ADD_OR_UPDATE_WITH_DOWNGRADE, DELETE etc.)."},
      {"name":  "Feature", "type":  "[]FeatureKey", "versions":  "10+",
        "about":  "The feature to be updated.",
        "fields":  [
          {"name": "Name", "type":  "string", "versions":  "10+",
            "about": "The name of the feature."},
          {"name":  "Version", "type":  "int64", "versions":  "10+",
            "about": "The new cluster-wide finalized max version for the feature."}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [	
	{    {"name": "FeaturesEpochErrorCode", "type": "int64int16", "versions": "10+",
      "about": "The error monotonicallycode, increasingor epoch0 ofif thethere metadatawas forno featureserror." },
    { "name":  "FeaturesErrorMessage", "type": "[]FeatureKeystring", "versions":  "10+",
      "about": "Cluster-wideThe finalizederror featuresmessage, andor their versions.",
      "fields":  [
        {"name": "Name", "type":  "string", "versions":  "1+",
          "about": "The name of the feature."},
        {"name":  "Version", "type":  "int64", "versions":  "1+",
          "about": "The cluster-wide finalized max version for the feature."}
      ]
    }
  ]
}null if there was no error." }
  ]
}

Feature advertisements from each broker in the Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at '/brokers/ids/<id>'). The contents of the broker nodes are cached in memory in the Controller.

...

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.UPDATE_FEATURES API call at any given time.

  2. Prior to mutating '/features' in ZK, the implementation verifies that all broker advertisements (as seen by it thus far) contained no incompatibilities with the provided List<FeatureUpdate> (see Validations section below).

  3. Once validations in #2 are successful, the List<FeatureUpdate> are applied to contents of '/features' ZK node, and it’s epoch is incremented . Upon successful write, ZK should automatically increment the version of the node by 1.

  4. If #3 is successful, ApiKeys.UPDATE_METADATA requests are issued to all brokers in the cluster. This can incur a slight latency cost, so we expect this API call to succeed/fail in worst case within few seconds (depending on the size of the cluster).

  5. Finally, Finally, an UpdateFeatureResponse is returned to the user. The metadata epoch number is the latest epoch in the ZK node: '/features' after the mutations were applied to it.

Validations

For any <feature_name>, the above API implementation guards against a change for the related entry in '/features' from {"version": X} to {"version": X’}, unless, it notices that each live broker in the deployment advertises {"maxVersion": Y >= X’} and {"minVersion": Z <= X’} in it’s BrokerIdZnode (for the same <feature_name>).

  1. For the above guarding checks:

    1. Cluster-wide feature max version downgrades are disallowed by default, unless FeatureUpdateType is explicitly set to ADD_OR_UPDATE_ALLOW_DOWNGRADE.
    2. If any broker does not contain a required feature, this is considered an incompatibility → such a case will fail the API request.

    3. If any broker contains an additional feature that’s not required → this is not considered an incompatibility.

  2. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

  3. Enabling Activating the actual semantics effects of a feature version cluster-wide is left to the discretion of the logic implementing the feature (ex: can be done via dynamic broker config).

  4. Deprecating/eliminating the semantics presence/effects of a specific feature version cluster-wide is left to the discretion of the logic backing the feature (ex: can be done via dynamic broker config). However, it may need further external verification to ensure no entity (ex: a consumer, or a broker) is actively using the feature (see Non-goals section).

...

  1. Each FeatureUpdate provided in the request was valid, and all updates were persisted to the '/features' ZK node by the controller (along with a bump to the  epochZK node version).

  2. An ApiKeys.UPDATE_METADATA call was successfully made by the controller to all live brokers (as seen by the controller) with the latest dictionary of cluster-wide finalized feature versions.

  3. Future ApiKeys.UPDATE_METADATA calls from controller to brokers will contain the latest set of cluster-wide feature versions, enabling Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK. This mechanism also enables brokers to shutdown when incompatibilities are detected (see Broker protections section).The returned object in the ApiKeys.UPDATE_FEATURES API call contains the latest set of cluster-wide feature versions (after all updates were applied), which can be useful to the client for debugging/logging purposes.

When a call to the ApiKeys.UPDATE_FEATURES API has failed on the client side, the following is guaranteed:

  1. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of List<FeatureUpdate> may have either succeeded or failed.

Here is the new schema of UpdateMetadataRequest with the finalized features added to it (the epoch contains the latest value, after it was incremented):

UpdateMetadataRequest schema (updated)


Next, below are the request and response definitions for the new API to delete features:

DeleteFeaturesRequest schema


Code Block
{
  "apiKey": 649,
  "type": "request",
  "name": "UpdateMetadataRequestDeleteFeaturesRequest",
  "validVersions": "0-61",
  "flexibleVersions": "61+",
  "fields": [
    ...
    ...
    { "name": "LiveBrokerstimeoutMs", "type": "[]UpdateMetadataBrokerint32", "versions": "0+", "fieldsdefault": ["60000",
	  "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "IdFeature", "type": "int32[]FeatureKey", "versions": "0+", "entityType": "brokerId",
          "about": "The list brokerof id." },
          ...
  features to be deleted.", "fields": [        ...
          { "name": "SecurityProtocolName", "type":  "int16string", "versions":  "10+",
            "about": "The name securityof protocolthe typefeature." }
        ]},
    }
    {]
}

DeleteFeaturesResponse schema


Code Block
{
  "nameapiKey": "Rack"48,
  "type": "stringresponse",
  "versionsname": "2+DeleteFeaturesResponse",
  "nullableVersionsvalidVersions": "0+-1",
  "ignorableflexibleVersions": true"1+",
          "aboutfields": "The rack which this broker belongs to." }
    ]},
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    {[	
	{ "name": "FeaturesEpochErrorCode", "type": "int64int16", "versions": "60+",
      "tagabout": 10000, "taggedVersions": "6+",
      "about": "The monotonically increasing epoch for features information.""The error code, or 0 if there was no error." },
    { "name":  "FeaturesErrorMessage", "type": "[]FeatureKeystring",
      "versions":  "60+", "tag": 10001, "taggedVersions": "6+",
      "about": "The error listmessage, ofor cluster-widenull finalizedif featuresthere andwas theirno versionserror.", }
      "fields":  [
        {"name": "Name", "type":  "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "Version", "type":  "int64", "versions":  "3+",
          "about": "The cluster-wide max  version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ],
  "commonStructs": [
    ...
    ...
  ]
}]
}


Underneath hood, the implementation of the ApiKeys.DELETE_FEATURES API does the following:

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.DELETE_FEATURES API call at any given time.

  2. This API should only be used in circumstances where a finalized feature needs to be garbage collected.
  3. Unlike the ApiKeys.DELETE_FEATURES API, this API mutates '/features' in ZK without any guarding checks on feature versions. Upon successful write, ZK should automatically increment the version of the node by 1.

Guarantees

When a call to the ApiKeys.DELETE_FEATURES API returns a success return code to the client, the following is guaranteed:

  1. Each feature provided in the request was valid, and all provided features were removed from the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK.

When a call to the ApiKeys.DELETE_FEATURES API has failed on the client side, the following is guaranteed:

  1. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of the deletions may have either succeeded or failed.

Client discovery of finalized feature versions

The latest list of cluster-wide finalized feature versions will be served via the existing ApiKeys.API_VERSIONS API. This is used for client discovery, as well as for debuggability of the system. We will introduce a tagged optional field in the schema of the ApiVersionsResponse to accommodate the information. Couple implementation details:

  • In the UpdateMetadataRequest request handler path, we will populate the MetadataCache with Whenever ZK notifies the broker about a change to '/features' node, the broker reads the latest list of finalized features as sent to it by the controller from ZK and caches it in memory (also see Broker protections section in this doc).

  • We will modify the implementation of the ApiKeys.API_VERSIONS API to populate the information about finalized features in the response. This will contain the cluster-wide finalized versions from the broker’s internal MetadataCache ZK cache of '/features' node contents (as explained above), as well as the broker-level supported versions.

  • The finalized feature versions populated in the response will be eventually consistent, since it is served from information propagated via  ApiKeys.UPDATE_METADATA API by the controllerrefreshed in the brokers via ZK watches.

Below is new ApiVersionsResponse schema showing the two new tagged optional fields for feature versions.

Note that the field FinalizedFeaturesEpoch contains the latest epoch stored in the epoch field in the ZK nodeversion of the '/features' node in ZK. On the client side, the epoch can be used to ignore older features metadata returned in ApiVersionsResponse, after newer features metadata was returned once (eventual consistency).

...

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version, inclusive." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version, inclusive." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name":  [ "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "NameVersion", "type": "stringint64", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "Version", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized max version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

Broker protections against race conditions

Certain validations will be introduced at few points in the broker code. The purpose is to avoid race conditions where incompatible brokers remain active in a cluster. The validations affirm that the supported feature versions in the broker are compatible with the expected cluster-wide feature versions. If any of these checks fail, this would trigger the broker shutdown sequence, and the process eventually exits with a non-zero exit code. The places where the validation is going to be introduced, are explained below:

  1. Validation shall be introduced during broker startup. This involves synchronously reading the cluster-wide feature versions from the '/features' ZK node just after initializing the ZK client, and before creating the broker’s own ephemeral node (roughly here). The feature versions available in the broker are checked against the contents of the node to ensure there are no incompatibilities.

  2. Validation shall be introduced in the ApiKeys.UPDATE_METADATA request handler code path. This affirms that the feature versions available in the broker are compatible with the cluster-wide feature versions sent by the controller in the ApiKeys.UPDATE_METADATA call. The broker carries out this validation as a first step when processing this API request, so if an incompatibility is detected, the broker will be made to shutdown immediately.

NOTE: The logic for the validations will be exactly the same as the one described under Validations section under Controller API.

Incompatible broker lifetime

One of the expectations about broker behavior, which is already followed in the broker, is the following.

  • Imagine at time T1 the broker starts up and registers it’s presence in ZK in its BrokerIdZnode, along with advertising it’s supported features.
  • Imagine at a future time T2 the broker receives the UpdateMetadataRequest from the controller, which contains the latest finalized features as seen by the controller. The broker validates this data against it’s supported features to make sure there is no mismatch (it will shutdown if there is an incompatibility).

Question: What if between T1 and T2, the broker contains incompatible features? would this cause a harm to the cluster?

Answer: The current behavior of the broker (which we intend to preserve) is that in between the 2 events T1 and T2, the broker is almost a silent entity in the cluster. It does not add any value to the cluster, or carry out any important broker activities. By “important”, we mean it is not doing mutations on it’s persistence, not mutating critical in-memory state, won’t be serving produce/fetch requests. Note it doesn’t even know it’s assigned partitions until it receives UpdateMetadataRequest from the controller. It is important to note that anything the broker is doing up until this point is not damaging/useful.

Tooling support

...

": "The cluster-wide finalized max version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

Broker protections against race conditions

Certain validations will be introduced at few points in the broker code. The purpose is to avoid race conditions where incompatible brokers remain active in a cluster. The validations affirm that the supported feature versions in the broker are compatible with the expected cluster-wide feature versions. If any of these checks fail, this would trigger the broker shutdown sequence, and the process eventually exits with a non-zero exit code. The places where the validation is going to be introduced, are explained below:

  1. Validation shall be introduced during broker startup. This involves synchronously reading the cluster-wide feature versions from the '/features' ZK node just after initializing the ZK client, and before creating the broker’s own ephemeral node (roughly here). The feature versions available in the broker are checked against the contents of the node to ensure there are no incompatibilities. If an incompatibility is detected, the broker will be made to shutdown immediately.

  2. A watch is setup on '/features' ZK node. Then, the above validation will be reused in the code path that reads the contents of the '/features' ZK node whenever a watch fires. This affirms that the feature versions available in the broker always remain compatible with the cluster-wide feature versions read from ZK.

NOTE: The logic for the validations will be exactly the same as the one described under Validations section under Controller API.

Incompatible broker lifetime

One of the expectations about broker behavior, which is already followed in the broker, is the following.

  • Imagine at time T1 the broker starts up and registers it’s presence in ZK in its BrokerIdZnode, along with advertising it’s supported features.
  • Imagine at a future time T2 the broker receives the a ZK notification about a change to '/features' node. The broker validates the new contents of '/features' node against it’s supported features to make sure there is no mismatch (it will shutdown if there is an incompatibility).

Question: What if between T1 and T2, the broker contains incompatible features? would this cause a harm to the cluster?

Answer: The current behavior of the broker (which we intend to preserve) is that in between the 2 events T1 and T2, the broker is almost a silent entity in the cluster. It does not add any value to the cluster, or carry out any important broker activities. By “important”, we mean it is not doing mutations on it’s persistence, not mutating critical in-memory state, won’t be serving produce/fetch requests. Note it doesn’t even know it’s assigned partitions until it receives UpdateMetadataRequest from the controller. It is important to note that anything the broker is doing up until this point is not damaging/useful.

Tooling support

We shall introduce a CLI tool backed by a new admin command type called kafka.admin.FeatureCommand. The implementation will be inspired by the various command classes written in Scala that already enable us to carry out things such as CreateTopics, DeleteTopics, AlterConfigs etc. from CLI.  The new FeatureCommand will be used by the cluster operator (i.e. a human), and willenable us to do the following:

  • Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  • Add/update cluster-wide finalized feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

  • Delete cluster-wide finalized features by exercising the newly introduced ApiKeys.DELETE_FEATURES API on a controller.

We shall introduce couple new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades. Below is Java-ish pseudocode for the same.

Admin API changes

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * would return a suitable exception.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

/**
 * Delete the specified cluster-wide finalized features.
 * You may
  • Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  • Add/update/delete cluster-wide feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

We shall introduce couple new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades. Below is Java-ish pseudocode for the same.

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * futures obtained from the returned UpdateFeaturesResultDeleteFeaturesResult. For example,
 * if a feature updatedeletion was in progress already, the controller
 * would return a suitable exception.
 *
 * @param updatesfeatures   set of feature updates,names keyed by the
 *                  name of the featureto be deleted.
 * @return           the result of the updateFeaturesdeleteFeatures request
 */
UpdateFeaturesResultDeleteFeaturesResult updateFeaturesdeleteFeatures(Set<FeatureUpdate>Set<String> updatesfeatures);

/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

// ---- END: Proposed Admin API definitions ----

interface FeatureBase {
	// The name of the feature.
	String getName();
}

// Represents a cluster-wide finalized feature, with a feature version.
interface FinalizedFeature extends FeatureBase // ---- END: Proposed Admin API definitions ----

interface FeatureBase {
	// The name of the feature.
	String getName();
}

// Represents a cluster-wide finalized feature, with a feature version.
interface FinalizedFeature extends FeatureBase {
    // The cluster-wide finalized value of the feature max version.
    long getVersion();
}

enum FeatureUpdateType {
    // Either add the feature (if not present), or update it (if present).
    ADD_OR_UPDATE,

	// Same as ADD_OR_UPDATE, but with downgrade allowed.
	ADD_OR_UPDATE_ALLOW_DOWNGRADE
}

interface FeatureUpdate {
    // The cluster-wide finalized value of Return the feature maxto be versionupdated.
	// The version returned longvia 'getFeature().getVersion();
}

enum FeatureUpdateType {
    // Delete the feature. Useful to eliminate accidental junk,', is the new value to-be-updated.
    FinalizedFeature getFeature();
    
    // Return orthe rarelytype eliminateof aupdate deprecatedto featurebe altogether.
    DELETE,

made.
    FeatureUpdateType getUpdateType();
}

// EitherRepresents add thea feature (ifthat notis present),supported orby updatea itbroker, (if present).
    ADD_OR_UPDATE,

	// Same as ADD_OR_UPDATE, but with downgrade allowed.
	ADD_OR_UPDATE_ALLOW_DOWNGRADE
}

interface FeatureUpdate {
    // Return the feature to be updated.with a specific
// feature version range [minVersion, maxVersion].
interface SupportedFeature extends FeatureBase {
	// The minimum version (value >= 0) of the supported feature.
	long getMinVersion();

	// The maximum version returned via 'getFeature().getVersion()':
    //  - For the FeatureUpdateType.{ADD_OR_UPDATE, ADD_OR_UPDATE_ALLOW_DOWNGRADE} operations, this is the new value to-be-updated.(value >=0 and value >= minVersion) of the supported feature.
	long getMaxVersion();
}

interface FeatureMetadata {
    // The -set Forof the FeatureUpdateType.DELETE operation, this value is ignored.
    FinalizedFeature getFeature();
    cluster-wide finalized features.
	Set<FinalizedFeature> finalizedFeatures;

    // ReturnThe themonotonically typeincreasing ofepoch updatefor tothe befinalized madefeatures.
    FeatureUpdateTypelong getUpdateTypegetEpoch();
}

	// RepresentsThe aset featureof thatfeatures is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
interface SupportedFeature extends FeatureBase {.
    Set<SupportedFeature> supportedFeatures;

	// The minimum version (value >= 0) hostname of the supported featurebroker.
	longString getMinVersion()host;

	// The maximum version (value >=0 and value >= minVersion)port of the supported featurebroker.
	longint32 getMaxVersion();port;   
}

interface FeatureMetadataDescribeFeaturesResult {
    //**
     * The setdata ofreturned cluster-widein finalized features.
	Set<FinalizedFeature> finalizedFeatures;

    // The monotonically increasing epoch for the finalized features.
    long getEpoch();

	// The set of features supported by a broker.
    Set<SupportedFeature> supportedFeatures;

	// The hostname of the broker.
	String host;

	// The port of the broker.
	int32 port;   the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

interface UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the first failed FeatureUpdate
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

interface DescribeFeaturesResultDeleteFeaturesResult {
	/**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     * The data returned in    individual feature deletion errors were seen.
     * 2. If the future has containsfailed, the top latestlevel entireerror set(if ofany)
     * finalized  cluster-wide features,or asthe wellerror asfrom the entirefirst setfailed of deletion
     *  features supported by(if theany) brokeris servingraised thisto readthe requestcaller.
     */
    KafkaFuture<FeatureMetadata> all();
}

...

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.
$> kafka-features.sh describe --bootstrap-server kafka-broker0.prn1:9071 [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 30,
        	"maxVersion": 70
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
            "version": 1
	"finalized_features": {
        "epoch": }0,
        "transactiongroup_coordinator": {
            	"version": 41
        },
        "consumer_offsets_topic_schematransaction_coordinator": { 
            	"version": 74
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--feature-features`upgrade` to provide a comma-separated list of featuresfeature and their new finalized max versionsversion to ADD_OR_UPDATE. Note that the flag can be used multiple times to specify multiple features.
#  - Optionally,Use use `--feature-force-_downgrade` to force downgrade of a finalized feature version. This should be used only when required should be used only when required. Note that the flag can be used multiple times to specify multiple features.

$> kafka-features.sh update \
      --bootstrap-server kafka-broker0.prn1:9071 \
      --featuresfeature-upgrade group_coordinator:2, \
      --feature-force-downgrade transaction_coordinator:3 [ \
      --force-downgrade]feature-upgrade my_new_feature:0

Please confirm before finalizing the upgrade of the following features:
1. group_coordinator from v1 (existing) to v2 (new)
2. consumer_offsets_topic_schema from none (existing) to v0 (new)

[Y/n]? Y

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 30,
        	"maxVersion": 70
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
            "version": 2
        },
        "transaction_coordinator": {
        	"version": 53
        },
        "consumer_offsets_topic_schema": { 
            "version": 70
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETEDISABLE FEATURES ===

# DeleteDisable/delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of features to be deleted.

$> kafka-features.sh deletedisable --bootstrap-server kafka-broker0.prn1:9071 --features group_coordinator,transaction_coordinator

Please confirm deletion of the following features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 30,
        	"maxVersion": 70
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": { 
            "version": 70
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

...

  1. We considered the idea of using the existing AlterConfigsRequest instead of introducing a new API to update features. Reasons to decide against using it:
    1. The AlterConfigsRequest format is not well equipped to elegantly express the notion of Set<FeatureUpdate> operations. The request format is more favorable towards expressing a list of key-value pairs, where each new value replaces the existing value for the specified key. Furthermore, the response format doesn't let us conveniently return the list of finalized feature versions to the caller.
    2. AlterConfigsRequest can be issued to any broker, which is not our requirement. For the future, anything that modifies ZooKeeper ZK needs to go through the controller. This is part of the work that we are doing to build the bridge release for KIP-500.
  2. As opposed to fine-granular features, we considered the idea of exposing the notion of IBP directly to the client for discovery. There are a few issues here:

    1. We are introducing a leaky abstraction here. Users/clients (excluding clients that are brokers) have to now operate with the notion of a "broker protocol version" – this is merely a detail that's internal to a cluster of brokers behind an API.
    2. This still doesn't solve the "double roll" problem related to IBP. We would need to still solve this problem.

...