Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Downgrade of feature version level:
    A feature "downgrade" refers to dropping support across the entire cluster for a feature version level. This means reducing the finalized maximum feature version level X to a version level Y, where Y < X. In other words, dropping cluster-wide support for an existing feature that , after the feature was already finalized at a newer version level XFirstly, we leave it to the cluster operator (i.e. human) to decide whether the above actions are backwards compatible. It is not within the scope of this KIP to provide help to the cluster operator to achieve this step. After the cluster operator is past this step, we do provide the following support:

    1. Just like with upgrades, a downgrade request to reduce feature version level is rejected by the system, unless, all brokers support the downgraded versions of the feature. In the example above, the system expects all brokers to support the downgraded feature version Y.

    2. We assume that, downgrades of finalized max feature version levels, are rare. For safety reasons, to restrict user intent, we request for the user to specify an explicit "allow downgrade" flag (in the API request) to safeguard against easy accidental attempts to downgrade version levels. Note that despite setting this flag, certain downgrades may be rejected by the system if it is impossible.
  2. Deprecation of feature version level:

    1. A need can arise to deprecate the usage of a certain version of one or more broker feature. A feature "deprecation" refers to increasing the finalized minimum feature version level X to a version level Y, where Y > X. We note that feature versions are typically deprecated during Kafka Broker releases. This is very unlike max feature version level upgrades, which can happen dynamically, after broker bits are deployed to a cluster.

    2. Firstly, the cluster operator (i.e. human) should use external means to establish that it is safe to stop supporting a particular version of broker feature. For example, verify (if needed) that no clients are actively using the version, before deciding to stop supporting it. It is not within the scope of this KIP to provide help to the cluster operator to achieve this step. After the cluster operator is past this step, we do provide the support that during a specific release of Kafka the system would mutate the persisted cluster-wide finalized feature versions to the desired value signaling feature deprecation.

...

  • Each broker will advertise the version range of it’s supported features in it’s own BrokerIdZnode during startup. The contents of the advertisement are specific to the broker binary, but the schema is common to all brokers.
  • The controller already watches BrokerIdZnode updates, and will serve as a gateway (via a newly introduced API) to safely finalize version upgrades to features – controller is well positioned to serve this requirement since it has a global view of all BrokerIdZnode in the cluster, and can avoid most race conditions in the solution.

  • The metadata about cluster-wide finalized version levels of features, is maintained in ZK by the controller. As a general rule, any active finalized feature version value is an int64 that's always >= 1. The value can be increased to indicate a version upgrade, or decreased to indicate a downgrade (or, if the value is < 1, it is considered as an ask for feature version level deletion).

  • The controller will be the one and only entity modifying the information about finalized feature version levels. We expect metadata write traffic to be very low. All brokers in the cluster setup watches on the ZK node containing the finalized features information, thereby they get notified whenever the contents of the ZK node are change via the controller.

  • Finally, client discovery about finalized feature versions will be performed via ApiKeys.API_VERSIONS API – the response will be modified to contain the necessary information (served from broker cache). Using this decentralized approach, we scale for metadata reads, albeit with eventually consistent results.

...

  • The value for the version key is an int64 that contains the version of the schema of the data stored in the ZK node.

  • The value for the features key is a dictionary that contains a mapping from feature names to their metadata (such as finalized version levels). 
    It's a map{string → map{string → <string | number>}}

     <feature_name>
                   |-->  <metadata_key>
                                        |-->  <metadata_value>

    • Top-level key <feature_name> is a non-empty string that’s a feature name.

      • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature metadata.

      • <metadata_value> is either a string or a number – it's the value for the <metadata_key>.

      • Note: this nested dictionary would contain the following keys:

        • 'min_version_level' whose value is an int64 representing the minimum finalized cluster-wide version level for the feature.

        • 'max_version_level' whose value is an int64 representing the maximum finalized cluster-wide version level for the feature.
        • the following rule always holds true: min_version_level >= 1 and max_version_level >= 1 and min_version_level <= max_version_level.

...

  • Upgrade case: Y > X and Y falls in the  [min_version, max_version] range advertised by each live broker in the cluster.

             [OR]

  • Downgrade case: Y < X, and Y falls in the  [min_version, max_version] range advertised by each live broker, and allowDowngrade flag is set in the request.

             [OR]

  • Deletion case: Y < 1 is special and indicates feature deletion, only if allowDowngrade flag is also set in the request.

Note the following rules that are applied important points:

  1. The usual regular usage of the API is a feature upgrade. This is by specifying a higher new max feature version level value (i.e. Y > X).

  2. By default, the API disallows max feature version level downgrades (Y < X) and finalized feature deletions (Y < 1). Such changes are allowed to be attempted only if the allowDowngrade flag is additionally set in the request. Note that despite the allowDowngrade flag being set, certain downgrades may be rejected by the controller, if the action is deemed impossible.

  3. If any broker does not contain a required feature present in the FeatureUpdate, this is considered an incompatibility → such a case will fail the API request.

  4. If any broker contains an additional feature that’s not required → this is not considered an incompatibility.

  5. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

...

Instead it is sufficient if such changes are done directly by the controller i.e. during a certain Kafka release we would change the controller code to mutate the '/features' ZK node increasing the minimum version level
of level of one or more finalized features (this will be a planned change, as determined by Kafka developers). Then, as this Broker release gets rolled out to a cluster, the feature versions will become permanently deprecated.

...

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version for the feature." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version for the feature." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "MaxVersionLevel", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized max version level for the feature."},
        {"name":  "MinVersionLevel", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized min version level for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

...

  • T1: Imagine at time T1 the following event E1 occurs: A broker B starts up, passes feature validation, and registers it’s presence in ZK in its BrokerIdZnode, along with advertising it’s supported features. Assume that this is broker B is just about to become incompatible in it's feature set, in comparison to cluster-wide finalized feature versions.
  • T1: At the same time T1, the following event E2 occurs that's concurrent with E1: a feature version level upgrade is finalized in the controller which causes broker B to become incompatible in it's feature set.
  • T2: At a future time T2 the following event E3 occurs: The incompatible broker B receives the a ZK notification about a change to '/features' node. The broker validates the new contents of '/features' node against it’s supported features, finds an incompatibility and shuts down immediately.

Question: What if In between E1 T1 and E2T2, the broker B containing incompatible features could lingers in the cluster? Would this cause a harm to the cluster?Solution: This . This window is very small (milli seconds), and typically rare – it can only happen in a rare case where an incompatible broker comes up in the cluster around the time that a feature version upgrade is finalized. Here is how we will handle the race condition: In
The question is, what if the controller finalizes features via E2 while processing of E1 is still in-flight/queued on the controller?  Would this cause a harm to the cluster?

Solution: We intend to handle the race condition by careful ordering of events in the controller. In the controller, the thread that handles the ApiKeys.UPDATE_FEATURES request (E2) will be the ControllerEventThreadThis is also the same thread that updates the controller's cache of Broker info whenever a new broker joins the cluster (E1). In this setup, if an ApiKeys.UPDATE_FEATURES request (E1E2) is processed ahead of a notification from ZK about an incompatible broker joining the cluster (E2E1), then the controller can certainly detect the incompatibility when it processes E2 after E1E1 after E2 (since it knows the latest finalized features). The controller could block would handle the incompatible broker, by blocking the remaining of the new broker startup sequence by refusing to send an UpdateMetadataRequest to bootstrap the new broker. Then, it is only a matter of time (milli seconds) before the new broker receives a ZK notification (E3) about a change to '/features' node, then automatically shuts itself down due to the incompatibility.

...

  1. The stream clients shall not downgrade to group_coordinator:v1 once they upgrade to group_coordinator:v2. The solution is to persist the EOS model group_coordinator flag as part of the consumers' metadata when the rebalance completes, so that on a second rebalance as long as some members are already on the beta newer version, the leader of the group will not kick off another feature request because it knows an upgrade was ongoing in last generation, based on the eventual consistency guarantee. It will just continue to inform all the members to upgrade to beta the newer version. 

  2. It is important to note that the user is only upgrading the cluster, from enabling group_coordinator:v1 to enabling group_coordinator:v1-v2. Therefore, once a client sees that v2 is enabled on any broker, it should be a clear signal that all broker binaries within the cluster would support group_coordinator:v2 semantic, it’s just a matter of time before feature version update metadata propagates to every broker from controller.

...