Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Downgrade of feature version level:
    A feature "downgrade" refers to dropping support across the entire cluster for a feature version level. This means reducing the finalized maximum feature version level X to a version level Y, where Y < X, after the feature was already finalized at a newer version level X. Firstly, we leave it to the cluster operator (i.e. human) to decide whether the above actions are backwards compatible. It is not within the scope of this KIP to provide help to the cluster operator to achieve this step. After the cluster operator is past this step, we do provide the following support:

    1. Just like with upgrades, a downgrade request to reduce feature version level is rejected by the system, unless, all brokers support the downgraded versions of the feature. In the example above, the system expects all brokers to support the downgraded feature version Y.

    2. We assume that, downgrades of finalized max feature version levels, are rare. For safety reasons, to restrict user intent, we request for the user to specify an explicit "allow downgrade" flag (in the API request) to safeguard against easy accidental attempts to downgrade version levels. Note that despite setting this flag, certain downgrades may be rejected by the system if it is impossible.
  2. Deprecation of feature version level:

    1. A need can arise to deprecate the usage of a certain version of one or more broker feature. A feature "deprecation" refers to increasing the finalized minimum feature version level X to a version level Y, where Y > X.  We note that feature versions are typically deprecated during Kafka Broker releases. This Deprecating a feature version is an incompatible change, which requires a major release of Kafka. This is very unlike max feature version level upgrades, which can happen dynamically, after broker bits are deployed to a cluster.

    2. Firstly, the cluster operator (i.e. human) should use external means to establish that it is safe to stop supporting a particular version of broker feature. For example, verify (if needed) that no clients are actively using the version, before deciding to stop supporting it. It is not within the scope of this KIP to provide help to the cluster operator to achieve this step. After the cluster operator is past this step, we do provide the support that during a specific release of Kafka the system would mutate the persisted cluster-wide finalized feature versions to the desired value signaling feature deprecation.

...

Each broker’s supported dictionary of {feature → version range} will be defined in the broker code. For each supported feature, the supported version range is defined by a min_version (an int64 starting always from 1) and max_version (an int64 >=1 and >= minVersion min_version).

The controller needs a way to discover this metadata from each broker. To facilitate the same, during startup, each broker will advertise it’s supported dictionary of {feature → version range}  in in it’s own ephemeral BrokerIdZnode (this is the existing ZK node at the path '/brokers/ids/<id>'), under a nested dictionary keyed 'features'. The controller already watches all BrokerIdZnode for updates, and thus can get it’s ZK cache populated with the per-broker versioning information (using existing means).

The schema for the advertised information is similar to the one in '/features' ZK node (see this section). Here is an example of a BrokerIdZnode with the proposed additional metadata towards the bottom.

BrokerIdZnode schema changes


Code Block
{ 
   "listener_security_protocol_map":{ 
      "INTERNAL":"PLAINTEXT",
      "REPLICATION":"PLAINTEXT",
      "EXTERNAL":"SASL_SSL"
   },
   "endpoints":[ 
      "INTERNAL://kafka-0.kafka.def.cluster.local:9071",
      "REPLICATION://kafka-0.kafka.abc.cluster.local:9072",
      "EXTERNAL://b3-kfc-mnope.us.clusterx:9092"
   ],
   "rack":"0",
   "jmx_port":7203,
   "host":"kafka-0.kafka.def.cluster.local",
   "timestamp":"1579041366882",
   "port":9071,
   // ----- START: PROPOSED ADDITIONAL/MODIFIED METADATA -----
   "version":5,  // existing key whose value has been bumped by 1
   "features": {  // new key
      "group_coordinator": {  // string -> name of the feature
          "min_version": 1,   // int64 -> represents the min supported version (>=1) of this feature
          "max_version": 3  // int64 -> represents the max supported version of this feature (>=1 and >= min_version)
      },
      "transaction_coordinator": { 
          "min_version": 1,
          "max_version": 4
      }
   }
   // ----- END: PROPOSED ADDITIONAL METADATA -----
}

...

Imagine a case where the '/features' ZK node is non-existent. In such a case, when the controller starts up, it would the controller comes up, there are 2 bootstrapping situations:

  1. When the controller comes up and there is no '/features' znode AND the IBP is less than migration_ibp_version (see migration section), then controller will not create a '/features' ZK node.
  2. When the controller comes up and there is no '/features' znode AND the IBP is greater than or equal to migration_ibp_version (see migration section), then the controller would create the ZK node for the first time (this is a blocking write that needs to succeed for the controller to continue its startup sequence). The data used to create the node, will be a map of {feature_name → {min_feature_version, max_feature_version}}. This is obtained by the controller service from the broker's supported features. This approach brings convenience to users

...

  1. migrating to use a Broker containing the new versioning system (from this KIP). The controller finalizes the default min/max feature version levels automatically.

Changes to Kafka Controller

We introduce 1 new Admin API that’s served only by the Kafka Controller, and identified by the new API key: ApiKeys.UPDATE_FEATURES. This API enables transactional application of a set of cluster-wide feature updates to the ZK '/features' node (i.e. either all provided FeatureUpdate are applied to ZK, or none):

  • The API requires AclOperation.ALTER on ResourceType.CLUSTER.
  • The API request contains a list of FeatureUpdate that need to be applied, as explained below (see Validations section for more details):

    • Each item specifies the finalized feature to be added or updated or deleted, along with the new max feature version level value.

    • Downgrade or deletion of feature version level, is not a regular operation/intent. This is only attempted in the controller if item sets an allowDowngrade flag, to convey user intent to attempt max version level downgrade/deletion. Note that despite this allowDowngrade flag being set, certain downgrades may be rejected by the controller if it is deemed impossible.
  • The API response contains an error code and an error message.
  • The API is transactional, meaning that if a single FeatureUpdate in the request can't be done, none of feature updates are done.
  • Changes to cluster-wide finalized minimum feature version level, can not be carried out using this API. This can be only done as explained later under Feature version deprecation section.

...

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "10+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
	  "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdate", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to features.", "fields": [
      {"name":  "AllowDowngrade", "type":  "bool", "versions":  "0+",
        "about": "When set to true, the feature version level is allowed to be downgraded/deleted."},
      {"name":  "Feature", "type":  "[]FeatureKey", "versions":  "0+",
        "about":  "The feature to be updated.",
        "fields":  [
          {"name": "Name", "type":  "string", "versions":  "0+",
            "about": "The name of the feature."},
          {"name":  "MaxVersionLevel", "type":  "int64", "versions":  "0+",
            "about": "The new cluster-wide finalized maximum version level for the feature. A value >= 1 is valid/regular. A value < 1, is special, and can be used to request the deletion of the feature."}
      ]}
    ]}
  ]
}

...

  • If the request was processed by a broker that's not the controller, then NOT_CONTROLLER error code is returned.
  • If we didn't have sufficient permission to perform the update, then CLUSTER_AUTHORIZATION_FAILED error code is returned.
  • If the request is being concurrently processed by the controller, then FEATURE_UPDATE_IN_PROGRESS error code (a new error code) is returned.If the request contained at least one FeatureUpdate that cannot be applied, then FEATURE_UPDATES_FAILED (a new error code) is returned.

Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "10+",
  "fields": [
	{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0`0` if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+"
      "about": "The error message, or null`null` if there was no error." }
  ]
}

...

Feature version deprecation

Deprecating a feature version is an incompatible change, which requires a major release of Kafka.  Sometimes there can be a need to deprecate a specific feature version for this (see Non-goals section). This requirement translates to increasing the cluster-wide finalized minimum version level of one or more features in the ZK '/features' node. It is important to note that the minimum version level can not be mutated via the Controller API. This is because, the minimum version level is usually increased only to indicate the intent to stop support for a certain feature version. We would usually deprecate features during broker releases, after prior announcements. Therefore, this is not a dynamic operation, and such a mutation is not supported through the ApiKeys.UPDATE_FEATURES controller API.

...

  • Basic usage happens typically when after a Kafka release, the cluster operator wants to finalize all latest feature versions. The tool internally has knowledge about a map of features to their respective max versions supported by the Broker. Using this information, the tool provides a facility to upgrade all feature max version levels to latest values known to the tool.
  • Downgrade of all feature max version levels, after they are finalized using the tool, is a rare occurrence. To facilitate emergency downgrade of all feature versions (ex: just before emergency roll back to a previous Kafka release), the tool provides a downgrade-all facility. To achieve this, the user needs to run the version of the tool packaged with the Kafka release that he/she needs to downgrade to. This is because the tool's knowledge of features and their version values, is limited to the version of the CLI tool itself (i.e. the information is packaged into the CLI tool when it is released).

We shall introduce 3 2 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades/downgrades. Below is Java-ish pseudocode for the same.

...