Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We introduce 1 new Admin API that’s served only by the Kafka Controller, and identified by the new API key: ApiKeys.UPDATE_FEATURES. This API enables transactional application enables application of a set of cluster-wide feature updates to the ZK '/features' node:

  • The API requires AclOperation.ALTER on ResourceType.CLUSTER.
  • The API request contains a list of FeatureUpdate that need to be applied, as explained below (see Validations section for more details):

    • Each item specifies the finalized feature to be added or updated or deleted, along with the new max feature version level value.

    • Downgrade or deletion of feature version level, is not a regular operation/intent. This is only attempted in the controller if the item sets an allowDowngrade flag, to convey user intent to attempt max version level downgrade/deletion. Note that despite this allowDowngrade flag being set, certain downgrades may be rejected by the controller if it is deemed impossible.
  • The API response contains an the result corresponding to each FeatureUpdate in the request. Each result is described by an error code and an error message.
  • The API is not transactional, meaning that if a single the request can be carried out partially i.e. some of the FeatureUpdate in the request can succeed, while the others don't be done, none of feature updates are done.
  • Changes to cluster-wide finalized minimum feature version level, can not be carried out using this API. This can be only done as explained later under Feature version deprecation section.

...

UpdateFeaturesRequest schema


Code Block
{
  "apiKey": 4852,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "timeoutMsFeatureUpdates", "type": "int32[]FeatureUpdateKey", "versions": "0+", "default": "60000",
	
      "about": "HowThe longlist toof waitupdates into milliseconds before timing out the request." },
finalized features.", "fields": [
       { "name": "FeatureUpdateFeature", "type": "[]FeatureUpdateKeystring", "versions": "0+", "mapKey": true,
        "about": "The listname of updatesthe finalized feature to be featuresupdated."},
 "fields": [
      {"name":  "AllowDowngradeMaxVersionLevel", "type":  "boolint16", "versions":  "0+",
        "about": "When set to true, the feature version level is allowed to be downgraded/deletedThe new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
      {"name":  "FeatureAllowDowngrade", "type":  "[]FeatureKeybool", "versions":  "0+",
        "about":  "TheWhen featureset to be updated."true,
 the finalized feature version level is allowed "fields":to  [
          {"name": "Name", "type":  "string", "versions":  "0+",
            "about": "The name of the feature."},be downgraded/deleted. The downgraded request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}
          {"name":  "MaxVersionLevel", "type":  "int16", "versions":  "0+",
            "about": "The new cluster-wide finalized maximum version level for the feature. A value >= 1 is valid/regular. A value < 1, is special, and can be used to request the deletion of the feature."}
      ]}
    ]}
  ]
}

UpdateFeaturesResponse schema

Possible top-level errors:

...

]}
  ]
}

UpdateFeaturesResponse schema

Possible top-level errors:

  • If the request was processed by a broker that's not the controller, then NOT_CONTROLLER error code is returned.
  • If we didn't have sufficient permission to perform the update, then CLUSTER_AUTHORIZATION_FAILED error code is returned.

Individual FeatureUpdate errors:

  • If the FeatureUpdate is invalid (ex: the provided maxVersionLevel is incorrect), then the result contains the INVALID_REQUEST error code.
  • If the FeatureUpdate can not be applied (ex: due to version incompatibilities), then FEATURE_UPDATE_FAILED (a new error code) is returned.

Code Block
{
  "apiKey": 52,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or `0` if there was no top-level error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions
Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
      "fieldsabout": [
	 "The top-level error message, or `null` if there was no top-level error." },
    { "name": "ErrorCodeResults", "type": "int16[]UpdatableFeatureResult", "versions": "0+",
      "about": "TheResults errorfor code,each or `0` if there was no error." },
feature update.", "fields": [
      { "name": "ErrorMessageFeature", "type": "string", "versions": "0+", "nullableVersionsmapKey": "0+"
true,
        "about": "The error message, or `null` if there was no error." }
  ]
}name of the finalized feature."},
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The feature update error code or `0` if the feature update succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The feature update error, or `null` if the feature update succeeded." }
    ]}
  ]
}

Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at '/brokers/ids/<id>'). The contents of the broker nodes are cached in memory in the Controller.

...

  1. The usual regular usage of the API is a feature upgrade. This is by specifying a higher new max feature version level value (i.e. Y > X).

  2. By default, the API disallows max feature version level downgrades (Y < X) and finalized feature deletions (Y < 1). Such changes are allowed to be attempted only if the allowDowngrade flag is additionally set in the request. Note that despite the allowDowngrade flag being set, certain downgrades may be rejected by the controller, if the action is deemed impossible.

  3. If any broker does not contain a feature present in the FeatureUpdate, this is considered an incompatibility → such a case will fail the API request.

  4. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

Guarantees

...

  1. the action is deemed impossible.

  2. If any broker does not contain a feature present in the FeatureUpdate, this is considered an incompatibility → such a case will fail the API request.

  3. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

Guarantees

The ApiKeys.UPDATE_FEATURES API response contains a top-level result as well as result per individual FeatureUpdate.

  • If the top-level error code indicates success, then it is guaranteed that there were no top-level failures. However, some individual FeatureUpdate may have still failed, so it is necessary to check the individual results.
  • If the top-level error code indicates failure, then it is guaranteed that the request has failed:
    • A server-side error means the '/features' ZK node is guaranteed to be left unmodified i.e. no FeatureUpdate was applied.
    • A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of List<FeatureUpdate> may have either succeeded or failed.
  • For each individual FeatureUpdate result indicating a success in the error code, the following is guaranteed:

      ...

        • The corresponding FeatureUpdate provided in the request was valid, and

      ...

        • the update was applied to the '/features' ZK node by the controller (along with a bump to the ZK node version).
        • Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch. This mechanism also enables brokers to shutdown when incompatibilities are detected (see Broker protections section).

      When a call to the ApiKeys.UPDATE_FEATURES API has failed, the following is guaranteed to the client:

      ...

      • For each individual FeatureUpdate result indicating a failure in the error code, it is guaranteed that the corresponding FeatureUpdate was not applied to the '/features' ZK node

      ...

      • by the controller.

      Feature version deprecation

      ...

      Question: The question is, what if the controller finalizes features via E2 while processing of broker registration via E1 is still in-flight/queued on the controller?  Would this cause a harm to the cluster? Basically, in between T1 and T2, the broker B containing incompatible features could linger in the cluster. This window is very small (milli seconds), and typically rare – it can only happen in a rare case where an incompatible broker comes up in the cluster around the time that a feature version upgrade is finalized.


      Solution: We intend to handle the race condition by careful ordering of events in the controller. In the controller, the thread that handles the ApiKeys.UPDATE_FEATURES request (E2) will be the ControllerEventThreadThis is also the same thread that updates the controller's cache of Broker info whenever a new broker joins the cluster (E1). In this setup, if an ApiKeys.UPDATE_FEATURES request (E2) is processed ahead of a notification from ZK about an incompatible broker joining the cluster (E1), then the controller can certainly detect the incompatibility when it processes E1 after E2 (since it knows the latest finalized features). The controller would handle the incompatible broker, by blocking skipping the remaining of the new broker startup sequence by refusing to send an UpdateMetadataRequest to bootstrap the handling of new broker registration. Then Then, it is only a matter of time (milli seconds) before the new broker receives a ZK notification (E3) about a change to '/features' node, then it automatically shuts itself down due to the incompatibility.

      ...

      • Basic usage happens typically when after a Kafka release, the cluster operator wants to finalize all latest feature versions. The tool internally has knowledge about a map of features to their respective max versions supported by the Broker. Using this information, the tool provides a facility to upgrade all feature max version levels to latest values known to the tool.
      • Downgrade of all feature max version levels, after they are finalized using the tool, is a rare occurrence. To facilitate emergency downgrade of all feature versions (ex: just before emergency roll back to a previous Kafka release), the tool provides a downgrade-all facility. To achieve this, the user needs to run the version of the tool packaged with the Kafka release that he/she needs to downgrade to. This is because the tool's knowledge of features and their version values, is limited to the version of the CLI tool itself (i.e. the information is packaged into the CLI tool when it is released).

      We shall introduce 2 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades/downgrades. Below is Java-ish pseudocode for the same.

      ...

      • .e. the information is packaged into the CLI tool when it is released).

      We shall introduce 2 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades/downgrades. Below is Java-ish pseudocode for the same.

      Admin API changes

      Code Block
      languagejava
      // ---- START: Proposed Admin API definitions ----
      /**
       * Describes supported and finalized features. You may anticipate 
       * certain exceptions when calling get() on the future obtained from the returned
       * DescribeFeaturesResult.
       *
       * @param options   options for the describeFeatures API, 
       * 
       * @return          a result object containing:
       *                   
      Code Block
      languagejava
      // ---- START: Proposed Admin API definitions ----
      /**
       * Return the following:
       * 1. List of cluster-wide finalized feature versions.
       * 2.  List of supported feature versions specific to the broker.
       *
       * You may anticipate certain exceptions when calling get() on the
       * futures obtained from the returned DescribeFeaturesResult2. List of supported feature versions specific to the broker.
       */
      DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
      
      /**
       * Update the feature versions supported cluster-wide. You may
       * anticipate certain exceptions when calling get() on the futures
       * obtained from the returned UpdateFeaturesResult.
       For example,*
       * if@param aupdates feature update wasmap inof progressfeature alreadyupdates, thekeyed controller
       * could return a suitable error.by the
       *
       * @param updates   set of feature updates, keyed by the
       *         name of the feature
       * @param options   nameoptions offor the featureupdateFeatures API
       *
       * @return          the resultresults of the updateFeaturesFeatureUpdate provided in the request
       */
      UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updatesMap<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
      
      // ---- END: Proposed Admin API definitions ----
      
      // Represents a cluster-wide finalized feature, with a feature version levels.
      class FinalizedFeature {
      	// The name of the feature.
      	String name();
      
          // The cluster-wide finalized value of the feature min version level (value >= 1).
          int16 minVersionLevel();
      
          // The cluster-wide finalized value of the feature max version level (value >=1 and value >= minVersionLevel).
          int16 maxVersionLevel();
      }
      
      // Represents a feature that is supported by a broker, with a specific
      // feature version range [minVersion, maxVersion].
      class SupportedFeature {
      	// The name of the feature.
      	String name();
      
      	// The minimum version (value >= 1) of the supported feature.
      	int16 minVersion();
      
      	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
      	int16 maxVersion();
      }
      
      // Represents an update to a Feature, which can be sent to the controller
      // for processing.
      class FeatureUpdate {
      	// The name of the feature to be updated.
      	String name();
      
          // The cluster-wide finalized NEW value of the feature max version level.
          // - When >= 1, it's the new value to-be-updated for the finalized feature.
      	// - When < 1, it indicates the deletion of a finalized feature.
          long maxVersionLevel();
          
          // Return true only if downgrade/deletion of a feature should be allowed.
          // Note that despite this allowDowngrade flag being set, certain downgrades
          // may be rejected by the controller if it is deemed unsafe to downgrade
          // the max version level of some specific feature.
          bool allowDowngrade();
      }
      
      // Represents a collection of feature metadata, along with the host:port
      // of the broker serving the metadata.
      class FeatureMetadata {
          // The set of cluster-wide finalized features, keyed by feature name.
      	Set<FinalizedFeature> finalizedFeatures();
      
          // The monotonically increasing epoch for the finalized features.
          int32 epoch();
      
      	// The set of features supported by a broker, keyed by feature name.
          Set<SupportedFeature> supportedFeatures();
      
      	// The hostname of the broker.
      	String host();
      
      	// The port of the broker.
      	int32 port();   
      }
      
      class DescribeFeaturesResult {
          /**
           * The data returned in the future contains the latest entire set of
           * finalized cluster-wide features, as well as the entire set of 
           * features supported by the broker serving this read request.
           */
          KafkaFuture<FeatureMetadata> allfeatureMetadata();
      }
      
      class UpdateFeaturesResult {
          /**
           * Returns a future which indicates success/failure which succeeds only if all the FeatureUpdate in the request succeed.
           */
       1. If the future has succeeded (i.e. no exceptions),KafkaFuture<Void> all();
      
          /**
           * Returns a map thenwith thekey requestbeing was 100% successful,feature name and no top level orvalue being
           * the future which individualcan FeatureUpdatebe errorsused wereto seen.check Thethe data
      status of the FeatureUpdate
        *    returned* in the futurerequest.
       contains the latest entire set of*
           * Possible error codes:
       finalized   cluster-wide features* (after- allNONE: updatesThe were applied),FeatureUpdate succeeded.
           * - NOT_CONTROLLER: The FeatureUpdate failed assince wellthe asrequest thewas entireprocessed setby ofa featuresbroker supportedthat's bynot the controller.
           *. -  serving this write request.
           * 2. If the future has failed, the top level error (if any)
           *    or the error from the FeatureUpdate(s) that failed
           *    (if any) is raised to the caller.CLUSTER_AUTHORIZATION_FAILED: The FeatureUpdate failed since there wasn't sufficient permission to perform the update.
           * - INVALID_REQUEST: The FeatureUpdate failed because it is invalid.
           * - FEATURE_UPDATE_FAILED: The FeatureUpdate failed because it can not be applied (ex: due to version incompatibilities)
           */
          Map<String, KafkaFuture<FeatureMetadata>KafkaFuture<Void>> allvalues();
      }

      Basic CLI tool usage

      ...