You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 76 Next »

Status

Current state: Under discussion

Discussion thread: here

JIRA Unable to render Jira issues macro, execution error.

Contents

Background

There are basically two somewhat separate areas that we're talking about within the scope of this KIP.

Client discovery

The first area is discovery on the client side. How do clients of Kafka become aware of broker capabilities?  For example, how does the client find out if the broker supports a new type of event delivery semantics, or a new type of broker storage functionality? Note that here the "client" here may be a producer, a consumer, a command-line tool, a graphical interface, a third-party application built on top of Kafka (for purposes such as administration/monitoring), or may be even the broker itself trying to communicate with other brokers via RPCs.

Today, the client relies on the ApiVersions{Request|Response} to determine at a per-request level which version of request should be used to send to the brokers. This is sufficient for client-side logic that only relies on the specific versions of request, however for other logic that requires client-side behavior to be different as well, such request-level ApiVersions check would not be sufficient. Detailed examples can be found below.

Feature gating

Development in Kafka (from a high level) is typically organized into features, each of which is tracked by a separate version number. Feature gating tries to answer the question, How does the broker decide what features to enable?  For example, When is it safe for the brokers to start serving new Exactly-Once(EOS) semantics? Should the controller send some additional information when it sends out LeaderAndIsrRequest? Should the broker write its log data in the "new" message format or one of the "old" ones? 

Upon first look, it might seem like we should always enable the latest features. However, feature gating becomes necessary in some cases:

  • Imagine the rolling upgrade of brokers, where, a Kafka cluster is being upgraded to use a new broker binary containing new versions of certain features. Until the binary upgrade completes successfully on all brokers, some brokers may support the new versions of the feature while some may not. This transitional phase can continue often for hours. During this time, any feature that would prevent successful communication and cooperation between the old and the new broker software must stay disabled. For example, the controller may not want to send out a new type of request, since the brokers which have not yet been upgraded would not understand this request.

  • Next, after a certain version of a certain feature is available on all brokers (i.e. post upgrade), today there is no safety net. An incompatible broker that’s not supporting an active version of the feature could still become active in the deployment. This could have bad consequences. For example, such a broker may be activated due to accidental operational errors causing an unintended downgrade/rollback. But the cluster doesn’t protect itself towards preventing this accident.

Motivation

The above areas translate into separate problems within the scope of this KIP, as described below.

Client discovery

In general, today in Kafka, clients do not have a way to discover the min/max feature versions supported by a broker, or by all  brokers in the cluster.

The  ApiVersionsRequest enables the client to ask a broker what APIs it supports. For each API that the broker enumerates in the response, it will also return a version range it supports. It is an obvious limitation that the response schema doesn't provide any details about features and their versions. Even if it does, such a response may be insufficient for cases, where, a client needs to commence usage of a specific feature version in the cluster, based on whether all brokers in the cluster support the feature at a specific version. Consider the following problem. Users could upgrade their client apps before or after upgrading their brokers. If it happens before the broker upgrade, any new features on the client apps that depend on certain broker-side support (from all brokers in the cluster) must stay disabled, until all the brokers have also been upgraded to the required version. The question then becomes, at what point in time can these new features be safely enabled on the client side? Today, this has to be decided manually by a human, and it is error-prone. Clients don’t have a way to programmatically learn what version of a certain feature is guaranteed to be supported by all brokers (i.e. cluster-wide finalized feature versions), and take suitable decisions.

For example, consider the Kafka Streams use case. For KIP-447, Streams would need to switch their clients to use the new thread level producer model only after all brokers support the same. But, switching to the new model at a time when some brokers don’t support the new model, can have consequences that break EOS semantics. Note that, if only some brokers support the new model, then Streams can continue to still use the old per-task producer model, without switching to the new model — this is not a problem.

Feature gating

There is a configuration key in Kafka called inter.broker.protocol (IBP). Currently, the IBP can only be set by changing the static configuration file supplied to the broker during startup. Although the IBP started out as a mechanism to control the version of RPCs that the brokers used for communication with each other, it has grown since then to gate a huge number of other features. For example, the IBP controls whether the broker will enable static consumer group membership, whether it will enable exactly once functionality, and whether it will allow incremental fetch requests.

There are certain problems with the IBP configuration. The IBP assumes a one-dimensional spectrum of versions. Also, managing the inter.broker.protocol is cumbersome and error-prone. Changes to this configuration key requires a costly broker restart. This is the source of the infamous "double roll."  If a customer wants to upgrade to the latest Kafka software and use the latest features, they must perform a rolling upgrade (first roll).  But then, after the upgrade is complete, in order to update the IBP, they will need to modify every configuration file in the cluster and restart every broker (second roll) – this is a costly operation. Even worse, if they leave the IBP configuration key unset, it will default to the latest version, which will probably cause confusing cluster instability during the first roll.

Goals

The goals of this KIP are the following. For the above problems of client discovery and feature gating, we would like to propose a flexible, operationally easy, safe, and maintainable solution:

  1. Client discovery: Provide an API to programmatically access the feature metadata. Here, the “metadata” refers to the feature version levels (i.e. the cluster-wide finalized maximum and minimum versions of broker features). We would like to serve this metadata to clients in an eventually consistent and scalable way.

  2. Feature gating: Provide an API to safely, durably and dynamically finalize the upgrades to cluster-wide feature version levels. The API (and it’s related tooling) can be used by the cluster operator (i.e. typically a human) to finalize feature maximum version level upgrades/downgrades.

  3. IBP deprecation: As a good side-effect of the above, we would like to provide a path to eventually deprecate the need for the cumbersome inter.broker.protocol configuration, and the broker “double roll” mechanism. A very tangible benefit of the solution is that we should be able to do broker upgrades with just a single rolling restart.

Explanation

  1. Client Discovery:

    • By scalable, we mean the solution should horizontally scale to the metadata read and write workload as the cluster grows with more features, brokers and clients. It is expected that metadata discovery is read heavy, while write traffic is very low (feature version levels are finalized typically during releases or other configuration pushes, which, can happen few times a day per cluster).

    • The metadata served to the client will contain an epoch value. These epoch values are integers and will increase monotonically whenever the metadata changes. At any given time, the latest epoch for the metadata returned to the client is valid, and the cluster functions in accordance with the same. Note: here, the metadata epoch # applies for the entire metadata contents, and is not particularly related to the individual feature version – these are 2 separate things. Please read this section for more information.

    • We want to be careful and specific about what we mean by consistent here. The metadata we serve to the client during discovery, will be eventually consistent. It turns out that scaling to strongly consistent metadata reads is not easy (in the current Kafka setup). And the cost of a solution that’s eventually consistent, can be made minimal, in the following way. Due to eventual consistency, there can be cases where an older lower epoch of the metadata is briefly returned during discovery, after a more recent higher epoch was returned at a previous point in time. We expect clients to always employ the rule that the latest received higher epoch of metadata always trumps an older smaller epoch. Those clients that are external to Kafka should strongly consider discovering the latest metadata once during startup from the brokers, and if required refresh the metadata periodically (to get the latest metadata).

  2. Feature gating:

    • It's only allowed to modify max feature version level, using the newly provided API. The min feature version level can not be modified with the new API (see note on deprecation in Non-goals section).
    • By safe, we mean: when processing a request to finalize a set of feature version levels, the system will dynamically verify that all brokers in the cluster support the intended version. If not, the request will be rejected. Also, when a broker starts up, it will verify that it is compatible with the configured feature versions. If it is not, it will either refuse to start up or eventually die as soon as it discovers a feature incompatibility.

    • By durable, we mean that the finalized features should be persisted durably and remembered across broker restarts. Generally the system should tolerate faults, as much as it does for storage of other critical metadata (such as topic configuration).

    • By dynamic, we mean that the finalized features can be mutated in a cheap/easy way without compromising the uptime of broker/controller processes, or the health of the cluster.

Non-goals

Within the scope of this KIP, we provide only certain support related to feature downgrades and deprecation. These are described below:

  1. Downgrade of feature version level:
    A feature "downgrade" refers to dropping support across the entire cluster for a feature version level. This means reducing the finalized maximum feature version level X to a version level Y, where Y < X. In other words, dropping cluster-wide support for an existing feature that was already finalized at a newer version level. Firstly, we leave it to the cluster operator (i.e. human) to decide whether the above actions are backwards compatible. It is not within the scope of this KIP to provide help to the cluster operator to achieve this step. After the cluster operator is past this step, we do provide the following support:

    1. Just like with upgrades, a downgrade request to reduce feature version level is rejected by the system, unless, all brokers support the downgraded versions of the feature. In the example above, the system expects all brokers to support the downgraded feature version Y.

    2. We assume that, downgrades of finalized max feature version levels, are rare. For safety reasons, we request for the human to specify an explicit "allow downgrade" flag (in the API/tool) to safeguard against easy accidental downgrades to version levels.
  2. Deprecation of feature version level:

    1. A need can arise to deprecate the usage of a certain version of one or more broker feature. A feature "deprecation" refers to increasing the finalized minimum feature version level X to a version level Y, where Y > X. We note that feature versions are typically deprecated during Kafka Broker releases. This is very unlike max feature version level upgrades, which can happen dynamically, after broker bits are deployed to a cluster.

    2. Firstly, the cluster operator (i.e. human) should use external means to establish that it is safe to stop supporting a particular version of broker feature. For example, verify (if needed) that no clients are actively using the version, before deciding to stop supporting it. It is not within the scope of this KIP to provide help to the cluster operator to achieve this step. After the cluster operator is past this step, we do provide the support that during a specific release of Kafka the system would mutate the persisted cluster-wide finalized feature versions to the desired value signaling feature deprecation.

Proposed changes

Below is a TL;DR of the changes:

  • Each broker will advertise the version range of it’s supported features in it’s own BrokerIdZnode during startup. The contents of the advertisement are specific to the broker binary, but the schema is common to all brokers.
  • The controller already watches BrokerIdZnode updates, and will serve as a gateway (via a newly introduced API) to finalize version upgrades to features – controller is well positioned to serve this requirement since it has a global view of all BrokerIdZnode in the cluster, and can avoid most race conditions in the solution.

  • The metadata about cluster-wide finalized version levels of features, is maintained in ZK by the controller. As a general rule, any active finalized feature version value is an int64 that's always >= 1. The value can be increased to indicate a version upgrade, or decreased to indicate a downgrade (or, if the value is < 1, it is considered as an ask for feature version level deletion).

  • The controller will be the one and only entity modifying the information about finalized feature version levels. We expect metadata write traffic to be very low. All brokers in the cluster setup watches on the ZK node containing the finalized features information, thereby they get notified whenever the contents of the ZK node are change via the controller.

  • Finally, client discovery about finalized feature versions will be performed via ApiKeys.API_VERSIONS API – the response will be modified to contain the necessary information (served from broker cache). Using this decentralized approach, we scale for metadata reads, albeit with eventually consistent results.

Broker advertisements

Each broker’s supported dictionary of {feature → version range} will be defined in the broker code. For each supported feature, the supported version range is defined by a min_version (an int64 starting always from 1) and max_version (an int64 >=1 and >= minVersion).

The controller needs a way to discover this metadata from each broker. To facilitate the same, during startup, each broker will advertise it’s supported {feature → version range} in it’s own ephemeral BrokerIdZnode (this is the existing ZK node at the path '/brokers/ids/<id>'), under a nested dictionary keyed 'features'. The controller already watches all BrokerIdZnode for updates, and thus can get it’s ZK cache populated with the per-broker versioning information (using existing means).

The schema for the advertised information is similar to the one in '/features' ZK node (see this section). Here is an example of a BrokerIdZnode with the proposed additional metadata towards the bottom.

BrokerIdZnode schema


{ 
   "listener_security_protocol_map":{ 
      "INTERNAL":"PLAINTEXT",
      "REPLICATION":"PLAINTEXT",
      "EXTERNAL":"SASL_SSL"
   },
   "endpoints":[ 
      "INTERNAL://kafka-0.kafka.def.cluster.local:9071",
      "REPLICATION://kafka-0.kafka.abc.cluster.local:9072",
      "EXTERNAL://b3-kfc-mnope.us.clusterx:9092"
   ],
   "rack":"0",
   "jmx_port":7203,
   "host":"kafka-0.kafka.def.cluster.local",
   "timestamp":"1579041366882",
   "port":9071,
   // ----- START: PROPOSED ADDITIONAL/MODIFIED METADATA -----
   "version":5,  // existing key whose value has been bumped by 1
   "features": {  // new key
      "group_coordinator": {  // string -> name of the feature
          "min_version": 1,   // int64 -> represents the min supported version (>=1) of this feature
          "max_version": 3  // int64 -> represents the max supported version of this feature (>=1 and >= min_version)
      },
      "transaction_coordinator": { 
          "min_version": 1,
          "max_version": 4
      }
   }
   // ----- END: PROPOSED ADDITIONAL METADATA -----
}

Persistence of finalized feature version levels

The proposal is that cluster-wide finalized max/min feature version levels will be persisted in a specific common ZK node. The path to the ZK node is proposed as: '/features' . The node content type is JSON (string), and the size is expected to be typically small (in several KBs). Couple high level details:

  • During regular operations, the data in the ZK node can be mutated only via a specific admin API served only by the controller.
  • An eventually consistent copy of the node data shall be made readable via existing ApiKeys.API_VERSIONS API served by any broker in the cluster (see this section). The latest copy of the node data will be cached in the controller, and if needed this can be obtained by directing the ApiKeys.API_VERSIONS API to the controller.

The above proposed read/write paths are described later in this doc. Below is an example of the contents of the '/features' ZK node, with it’s schema explained below.

Schema (JSON)

{
   "version": 0, // int64 -> Represents the version of the schema for the data stored in the ZK node   
   "features": {
       	"group_coordinator": {   // string -> name of the feature
			"min_version_level": 0, // int64 -> Represents the cluster-wide finalized minimum version level (>=1) of this feature
           	"max_version_level": 3 // int64 -> Represents the cluster-wide finalized maximum version level (>=1 and >= min_version_level) of this feature
	    },
        "consumer_offsets_topic_schema": { 
      	    "min_version_level": 0,
           	"max_version_level": 4
	}
}

The schema is a JSON dictionary with few different keys and values, as explained below:

  • The value for the version key is an int64 that contains the version of the schema of the data stored in the ZK node.

  • The value for the features key is a dictionary that contains a mapping from feature names to their metadata (such as finalized version levels). It's a map{string → map{string → <string | number>}}

     <feature_name>
                   |-->  <metadata_key>
                                        |-->  <metadata_value>

    • Top-level key <feature_name> is a non-empty string that’s a feature name.

      • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature metadata.

      • <metadata_value> is either a string or a number – it's the value for the <metadata_key>.

      • Note: this nested dictionary would contain the following keys:

        • 'min_version_level' whose value is an int64 representing the minimum finalized cluster-wide version level for the feature.

        • 'max_version_level' whose value is an int64 representing the maximum finalized cluster-wide version level for the feature.
        • the following rule always holds true: min_version_level >= 1 and max_version_level >= 1 and min_version_level <= max_version_level.

Controller: ZK node bootstrap with default values

Imagine a case where the '/features' ZK node is non-existent. In such a case, when the controller starts up, it would create the ZK node for the first time (this is a blocking write that needs to succeed for the controller to continue its startup sequence). The data used to create the node, will be a map of {feature_name → {min_feature_version, max_feature_version}}. This is obtained by the controller service from the broker's supported features. This approach brings convenience to users bootstrapping a Kafka cluster for the first time (with a specific Kafka Broker release). The controller finalizes the default min/max feature version levels automatically.

Changes to Kafka Controller

We introduce 1 new Admin API that’s served only by the Kafka Controller, and identified by the new API key: ApiKeys.UPDATE_FEATURES. This API enables transactional application of a set of cluster-wide feature updates to the ZK '/features' node (i.e. either all provided FeatureUpdate are applied to ZK, or none):

  • The API requires AclOperation.ALTER on ResourceType.CLUSTER.
  • The API request contains a list of FeatureUpdate that need to be applied, as explained below:

    • Each item specifies the finalized feature to be added or updated or deleted, along with the new max feature version level value.

    • Max feature version level downgrades are not a regular operation. Each item optionally can specify an allowDowngrade flag, which can be used to allow version level downgrades (or deletions).
    • To add a new finalized feature version level, or update an existing one, the user must specify the version level starting from 1 (and increasing).
    • If a finalized feature needs to be permanently deleted, the user must specify a max version level value < 1, and should also set the allowDowngrade flag.

  • The API response contains an error code and an error message.
  • Changes to cluster-wide finalized minimum feature version level, can not be carried out using this API. This can be only done as explained later under Feature version deprecation section.

To help explain things better, below are the request and response definitions for the new API to update features (also see section showing related pseudocode for the Admin API):

UpdateFeaturesRequest schema

{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
	  "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdate", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to features.", "fields": [
      {"name":  "AllowDowngrade", "type":  "bool", "versions":  "0+",
        "about": "When set to true, the feature version level is allowed to be downgraded/deleted."},
      {"name":  "Feature", "type":  "[]FeatureKey", "versions":  "0+",
        "about":  "The feature to be updated.",
        "fields":  [
          {"name": "Name", "type":  "string", "versions":  "0+",
            "about": "The name of the feature."},
          {"name":  "MaxVersionLevel", "type":  "int64", "versions":  "0+",
            "about": "The new cluster-wide finalized maximum version level for the feature. A value >= 1 is valid/regular. A value < 1, is special, and can be used to request the deletion of the feature."}
      ]}
    ]}
  ]
}

UpdateFeaturesResponse schema

Possible top-level errors:

  • If the request was processed by a broker that's not the controller, then NOT_CONTROLLER error code is returned.
  • If we didn't have sufficient permission to perform the update, then CLUSTER_AUTHORIZATION_FAILED error code is returned.
  • If the request is being concurrently processed by the controller, then FEATURE_UPDATE_IN_PROGRESS error code (a new error code) is returned.
  • If the request contained at least one FeatureUpdate that cannot be applied, then FEATURE_UPDATES_FAILED (a new error code) is returned.

{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
	{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "about": "The error message, or null if there was no error." }
  ]
}

Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at '/brokers/ids/<id>'). The contents of the broker nodes are cached in memory in the Controller.

Underneath hood, the implementation of the ApiKeys.UPDATE_FEATURES API does the following:

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.UPDATE_FEATURES API call at any given time.

  2. Prior to mutating '/features' in ZK, the implementation verifies that all broker advertisements (as seen by it thus far) contained no incompatibilities with the provided List<FeatureUpdate> (see Validations section below).

    1. Note that feature version deletions (using a version level < 1) can be used in circumstances where a finalized feature needs to be garbage collected. This type mutates '/features' in ZK without any guarding checks on feature versions.

  3. Once validations in #2 are successful, the List<FeatureUpdate> are applied to contents of '/features' ZK node. Upon successful write, ZK should automatically increment the version of the node by 1.

  4. Finally, an UpdateFeatureResponse is returned to the user.

Validations

For any <feature_name>, the above API implementation guards against a change for the related entry in '/features' from {"max_version_level": X} to {"max_version_level": X’}, unless, it notices that each live broker in the deployment advertises {"maxVersion": Y >= X’} and {"minVersion": Z <= X’} in it’s BrokerIdZnode (for the same <feature_name>). A similar check is also applied to changes in the "min_version_level" for a feature.

  1. Related with the above guarding checks:

    1. By default, the API disallows cluster-wide feature version level downgrades and deletions. These are allowed only if the allowDowngrade flag is specified.
    2. If any broker does not contain a required feature, this is considered an incompatibility → such a case will fail the API request.

    3. If any broker contains an additional feature that’s not required → this is not considered an incompatibility.

  2. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

  3. Activating the effects of a feature version cluster-wide is left to the discretion of the logic implementing the feature (ex: can be done via dynamic broker config).

  4. Deprecating/eliminating the presence/effects of a specific feature version cluster-wide is left to the discretion of the logic backing the feature (ex: can be done via dynamic broker config). However, it may need further external verification to ensure no entity (ex: a consumer, or a broker) is actively using the feature (see Non-goals section).

Guarantees

When a call to the ApiKeys.UPDATE_FEATURES API returns a success return code to the client, the following is guaranteed:

  1. Each FeatureUpdate provided in the request was valid, and all updates were persisted to the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch. This mechanism also enables brokers to shutdown when incompatibilities are detected (see Broker protections section).

When a call to the ApiKeys.UPDATE_FEATURES API has failed on the client side, the following is guaranteed:

  1. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of List<FeatureUpdate> may have either succeeded or failed.

Feature version deprecation

Sometimes there can be a need to deprecate a specific feature version (see Non-goals section). This requirement translates to increasing the cluster-wide finalized minimum version level of one or more features in the ZK '/features' node. It is important to note that we only allow the finalized feature maximum version level to be increased/decreased dynamically via the new controller API. Contrastingly, the minimum version level can not be
mutated via the Controller API. This is because, the minimum version level is usually increased only to indicate the intent to stop support for a certain feature version. We would usually deprecate features during broker releases,
after prior announcements. Therefore, this is not a dynamic operation, and such a mutation is not supported through the ApiKeys.UPDATE_FEATURES controller API.

Instead it is sufficient if such changes are done directly by the controller i.e. during a certain Kafka release we would change the controller code to mutate the '/features' ZK node increasing the minimum version level
of one or more finalized features (this will be a planned change, as determined by Kafka developers). Then, as this Broker release gets rolled out to a cluster, the feature versions will become permanently deprecated.

Client discovery of finalized feature versions

The latest list of cluster-wide finalized feature version levels will be served via the existing ApiKeys.API_VERSIONS API. This is used for client discovery, as well as for debuggability of the system. We will introduce a tagged optional field in the schema of the ApiVersionsResponse to accommodate the information. Couple implementation details:

  • Whenever ZK notifies the broker about a change to '/features' node, the broker will read the latest list of finalized features from ZK and caches it in memory (also see Broker protections section in this doc).

  • We will modify the implementation of the ApiKeys.API_VERSIONS API to populate the information about finalized features in the response. This will contain the cluster-wide finalized versions from the broker’s internal ZK cache of '/features' node contents (as explained above), as well as the broker-level supported versions (an additional piece of information that's useful for debugging).

  • The finalized feature versions populated in the response will be eventually consistent, since it is refreshed in the brokers via ZK watches.

Below is new ApiVersionsResponse schema showing the two new tagged optional fields for feature versions.

Note that the field FinalizedFeaturesEpoch contains the latest version of the '/features' node in ZK. On the client side, the epoch can be used to ignore older features metadata returned in ApiVersionsResponse, after newer features metadata was returned once (eventual consistency).

ApiVersionsResponse schema (updated)


{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "MaxVersionLevel", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized max version level for the feature."},
        {"name":  "MinVersionLevel", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized min version level for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

Broker protections against race conditions

Certain validations will be introduced at few points in the broker code. The purpose is to avoid race conditions where incompatible brokers remain active in a cluster. The validations affirm that the supported feature versions in the broker are compatible with the expected cluster-wide feature versions. If any of these checks fail, this would trigger the broker shutdown sequence, and the process eventually exits with a non-zero exit code. The places where the validation is going to be introduced, are explained below:

  1. Validation shall be introduced during broker startup. This involves synchronously reading the cluster-wide feature versions from the '/features' ZK node just after initializing the ZK client, and before creating the broker’s own ephemeral node (roughly here). The feature versions available in the broker are checked against the contents of the '/features' ZK node to ensure there are no incompatibilities. If an incompatibility is detected, the broker will be made to shutdown immediately.

  2. A watch is setup on '/features' ZK node. Then, the above validation will be reused in the code path that reads the contents of the '/features' ZK node whenever a watch fires. This affirms that the feature versions available in the broker always remain compatible with the cluster-wide feature versions read from ZK.

NOTE: The logic for the validations will be exactly the same as the one described under Validations section under Controller API.

Incompatible broker lifetime race condition

Description of a rare race condition:

  • T1: Imagine at time T1 the following event E1 occurs: A broker B starts up, passes feature validation, and registers it’s presence in ZK in its BrokerIdZnode, along with advertising it’s supported features. Assume that this is broker B is just about to become incompatible in it's feature set, in comparison to cluster-wide finalized feature versions.
  • T1: At the same time T1, the following event E2 occurs that's concurrent with E1: a feature version level upgrade is finalized in the controller which causes broker B to become incompatible in it's feature set.
  • T2: At a future time T2 the following event E3 occurs: The incompatible broker B receives the a ZK notification about a change to '/features' node. The broker validates the new contents of '/features' node against it’s supported features, finds an incompatibility and shuts down immediately.

Question: What if between E1 and E2, the broker B containing incompatible features lingers in the cluster? Would this cause a harm to the cluster?

Solution: This window is very small (milli seconds), and typically rare – it can only happen in a rare case where an incompatible broker comes up in the cluster around the time that a feature version upgrade is finalized. Here is how we will handle the race condition: In the controller, the thread that handles the ApiKeys.UPDATE_FEATURES request will be the ControllerEventThreadThis is also the same thread that updates the controller's cache of Broker info whenever a new broker joins the cluster. In this setup, if an ApiKeys.UPDATE_FEATURES request (E1) is processed ahead of a notification from ZK about an incompatible broker joining the cluster (E2), then the controller can certainly detect the incompatibility when it processes E2 after E1. The controller could block the remaining of the new broker startup sequence by refusing to send an UpdateMetadataRequest to bootstrap the new broker. Then, it is only a matter of time (milli seconds) before the new broker receives a ZK notification (E3) about a change to '/features' node, then automatically shuts itself down due to the incompatibility.

Tooling support

We shall introduce a FeatureCommand CLI tool backed by a new admin command library. This CLI tool will be maintained in the Kafka repository, alongside the code for the Kafka Broker.
The section on Guidelines on feature versions and workflows, is a recommended read ahead of using the CLI tool in practice.

The CLI tool will be used by the cluster operator (i.e. a human), and will enable us to do the following:

  1. Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.
  2. Add/update/delete specific or all cluster-wide finalized feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

The CLI tool is versioned. Whenever a Kafka release introduces a new feature version, or modifies an existing feature version, the CLI tool shall also be updated with this information. Newer versions of the CLI tool will be released as part of the Kafka releasesThe tool is handy for both regular usage (upgrade/downgrade all feature max version levels) as well as advanced usage (change specific feature version levels). Later below, we demonstrate both such usages of the CLI tool.

  • Regular usage happens typically when after a Kafka release, the cluster operator wants to finalize all latest feature versions. The tool internally has knowledge about a map of features to their respective max versions supported by the Broker. Using this information, the tool provides a facility to upgrade all feature max version levels to latest values known to the tool.
  • Downgrade of feature max version levels, after they are finalized using the tool, are a rare occurrence. To facilitate emergency downgrade of all feature versions (ex: during emergency roll back to a previous Kafka release), the tool provides a downgrade-all facility. To achieve this, the user needs to run the version of the tool packaged with the Kafka release that he/she needs to downgrade to. This is because the tool's knowledge of features and their version values, is limited to the version of the CLI tool itself (i.e. the information is packaged into the CLI tool when it is released).

We shall introduce 3 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades/downgrades. Below is Java-ish pseudocode for the same.

Admin API changes

// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * could return a suitable error.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

// ---- END: Proposed Admin API definitions ----

// Represents a cluster-wide finalized feature, with a feature version levels.
class FinalizedFeature {
	// The name of the feature.
	String name();

    // The cluster-wide finalized value of the feature min version level (value >= 1).
    long minVersionLevel();

    // The cluster-wide finalized value of the feature max version level (value >=1 and value >= minVersionLevel).
    long maxVersionLevel();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
class SupportedFeature {
	// The name of the feature.
	String name();

	// The minimum version (value >= 1) of the supported feature.
	long minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	long maxVersion();
}

// Represents an update to a Feature, which can be sent to the controller
// for processing.
class FeatureUpdate {
	// The name of the feature to be updated.
	String name();

    // The cluster-wide finalized NEW value of the feature max version level.
    // - When >= 1, it's the new value to-be-updated for the finalized feature.
	// - When < 1, it indicates the deletion of a finalized feature.
    long maxVersionLevel();
    
    // Return true only if downgrade/deletion of a feature should be allowed.
    bool allowDowngrade();
}

// Represents a collection of feature metadata, along with the host:port
// of the broker serving the metadata.
class FeatureMetadata {
    // The set of cluster-wide finalized features, keyed by feature name.
	Set<FinalizedFeature> finalizedFeatures();

    // The monotonically increasing epoch for the finalized features.
    long epoch();

	// The set of features supported by a broker, keyed by feature name.
    Set<SupportedFeature> supportedFeatures();

	// The hostname of the broker.
	String host();

	// The port of the broker.
	int32 port();   
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the FeatureUpdate(s) that failed
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

Regular CLI tool usage

Following are examples of regular usage of the CLI tool, which involves the following activities:

  1. Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  2. Upgrade the max version levels of all features, to their latest values, as known to the CLI tool internally. This becomes useful after completing the deployment of a new Kafka Broker release onto an existing cluster. This removes the burden to individually finalize feature upgrades. 
  3. Downgrade the max version levels of all features, to the values known to the CLI tool internally. This becomes useful during an emergency cluster downgrade, after finalizing feature levels from a previous upgrade.

=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.

$> kafka-features.sh describe \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 1
        },
        "transaction_coordinator": {
			"min_version_level": 1,
        	"max_version_level": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== UPGRADE TO ALL LATEST FEATURES ===

# Upgrade to the max version levels of all features, as internally known to the CLI tool.
#
# This command removes the burden to individually finalize feature upgrades.
# This becomes handy to a cluster operator intending to finalize a cluster with all the latest
# available feature version levels. This usually happens after completing the deployment
# of a newer Kafka Broker release onto an existing cluster.

$> kafka-features.sh finalize-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 3
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 6
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# Downgrade to the max version levels of all features known to the CLI tool.
#
# This command removes the burden to individually finalize feature version
# downgrades. This becomes handy to a cluster operator intending to downgrade all
# feature version levels, just prior to rolling back a Kafka Broker deployment
# on a cluster, to a previous Broker release.

$> kafka-features.sh downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 3
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 6
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

Advanced CLI tool usage

Following are examples of advanced usage of the CLI tool. Going beyond regular usage, advanced usage involves adding/updating/deleting specific cluster-wide finalized feature versions.

=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--allow-downgrade` to allow a downgrade for feature version levels. This should be used only when required.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --allow-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 2
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of finalized features to be deleted.

$> kafka-features.sh delete \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": {
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

New or changed public interfaces

Summary of changes:

  1. We introduce 1 new API in the broker RPC interface (see this section). This is the new ApiKeys.UPDATE_FEATURES (schema) that can only be served successfully by the controller.
  2. We introduce few optional fields in the ApiVersionsResponse containing the cluster-wide finalized feature metadata, feature metadata epoch, and the broker's supported features (see this section).
  3. We introduce 2 new APIs in the Admin interface to describe/addOrUpdate/delete features (see this section). Underneath covers, these exercise the APIs mentioned above.

Guidelines on feature versions and workflows

With the newly proposed feature versioning system in this KIP, it becomes quite important to understand when to use it, and when not to.

When to use versioned feature flags?

  • A "breaking change" typically happens whenever the underlying Broker logic has introduced a different concept (such as a message type, or a new field in a protocol) which only certain newer versions of the Broker code can understand/process. The older Broker versions could consider such a change to be alien i.e. the older Broker versions could exhibit undefined or terminal behavior if they notice effects of the breaking change. Similarly, Kafka clients could exhibit undesirable behavior if they commence usage of a feature version on the cluster, assuming all Brokers support it, while in reality some Brokers don't.
    As part of Kafka operations, there are occasions when it becomes necessary to safely activate certain breaking changes introduced by a newer Broker version. Such changes become good candidates to be released behind a feature flag . The cluster-wide max feature version level can be safely increased/finalized (using the provided mechanism in this KIP), after the cluster-wide deployment of the required Broker version is over. With the versioning system in place, whenever the effects of the underlying breaking change is activated by the cluster operator, the system protects the cluster and Kafka clients from bad effects due to breaking changes.
  • As a guideline, max supported feature version values should be increased if and only if the change is a breaking change. Also, min feature version value should be increased if and only if a feature version needs to be deprecated. Non-breaking changes or non-deprecating changes could continue to be made to the Broker code without modifying the feature version values.

Workflows:

  • Most common workflow for the feature versioning system would involve finalizing a cluster upgrade to max feature version levels for all features, after completing the release of a new Kafka Broker binary on a cluster. This is explained  under Regular CLI tool usage section.
  • Emergency downgrades of max version levels of features are rare, especially after they are finalized  (because these are breaking changes, and finalizing these changes can have consequences). For such rare situations, the user could still downgrade specific features using the CLI tool. Also, all features can be downgraded using the CLI tool belonging to a specific Broker release. For examples of these, please refer to the Tooling support section.

Use case: group_coordinator feature flag (for KIP-447)

As described in the motivation for this KIP, the Kafka Streams use case is going to be immediately benefitted by programmatically learning the cluster-wide version of EOS feature on the brokers (KIP-447). We shall use the group_coordinator feature flag to achieve the same, as explained below. This feature flag will cover both schema and protocol changes related to group coordinator (KIP-447 falls under a change to the group coordinator).

Imagine we shipped the feature versioning scheme before the shipment of KIP-447. There would then be at least two versions of the broker binary:

  • One of these (the old/existing broker bits) would only advertise group_coordinator feature with max version 1 (shortly referred to as v1 below). v1 doesn’t contain the latest EOS semantic described in KIP-447.

  • The other (the new broker bits) would advertise group_coordinator feature v1-v2 i.e. max version v1 as well as v2. v2 contains the EOS semantic described in KIP-447.

Kafka Streams client uses the consumer rebalance protocol to propagate the group metadata information. There would be one nominated leader among the group. The sequence of feature upgrade events in the happy path shall be the following (T0, T1, …, Tn refers to increasing time):

  • T0: The broker cluster is running with old binary, and a stream application is communicating with it. The stream clients will be using group_coordinator:v1 feature. The leader stream thread would use its admin client to periodically call DescribeFeatures on a random broker in the cluster (presumably every minute), to get information about the latest cluster-wide enabled features. Thus, it would learn about the group_coordinator feature flag, with the finalized version at v1 .
  • T1: The cluster undergoes a rolling upgrade to the new broker binary. Once the upgrade is complete, every broker in the cluster is expected to support v1-v2 versions for the group_coordinator feature flag.
  • T2: The controller still will has enabled only group_coordinator:v1 in '/features' ZK node. So, the above rolling has no effect (yet) on the streams client side.
  • T3: Knowing that the cluster upgrade has completed, the cluster operator (i.e. a human) sends an ApiKeys.UPDATE_FEATURES request to the controller. This is to finalize the cluster-wide group_coordinator feature version level upgrade from v1 to v2. When this request is successful, it means the following:
    → The controller has checked all brokers in the cluster advertised support for group_coordinator:v2, and it has persisted the upgrade to ZK '/features' node.
    → The brokers are gradually receiving ZK notifications about the update to the '/features' ZK node. When each broker refreshes the contents of the ZK node, it will become aware that group_coordinator:v2 has been finalized cluster-wide.
  • T3-4: The streams leader thread, at a certain point in time will see the above change (via the periodic DescribeFeatures call). It will then trigger a rebalance of the streams group plumbing down group_coordinator:v2 feature flag to all members. It is key to note that this event could happen concurrently with T3 that’s why the event is labelled T3-4.
  • T5: Every member of the streams group will be gradually upgrading towards group_coordinator:v2 just as the leader instructed above. This is when the stream client will switch from the old per-task producer model to the new thread level producer model (for KIP-447).

There are some edge cases in this happy upgrade path we shall explain below.

Delay of feature version propagation

Due to eventual consistency in metadata propagation, it is possible that some brokers get notified about group_coordinator:v2 later than others. At T3-4, the streams leader thread first upgrades to group_coordinator:v2 when it learns of a feature version level bump from the broker. Then, after T5 , there exists a case where the leader streams thread could query a stale broker, and learn that the feature support only contains v1. If this situation is not handled properly, we are in a danger of downgrade. This is because in KIP-447 we don’t support downgrade of the processing model, unless user wipes out the old stream instances and restarts from scratch. Following is done to make sure the above scenario never affects the stream use case:

  1. The stream clients shall not downgrade to group_coordinator:v1 once they upgrade to group_coordinator:v2. The solution is to persist the EOS model flag as part of the consumers' metadata when the rebalance completes, so that on a second rebalance as long as some members are already on the beta version, the leader of the group will not kick off another feature request because it knows an upgrade was ongoing in last generation, based on the eventual consistency guarantee. It will just continue to inform all the members to upgrade to beta version. 

  2. It is important to note that the user is only upgrading the cluster, from enabling group_coordinator:v1 to enabling group_coordinator:v1-v2. Therefore, once a client sees that v2 is enabled on any broker, it should be a clear signal that all broker binaries within the cluster would support group_coordinator:v2 semantic, it’s just a matter of time before feature version update metadata propagates to every broker from controller.

The above ensures that the upgrade logic is reliable/safe, as there is no case of a downgrade affecting the stream client (during the upgrade path).

Downgrade of group_coordinator feature

As such, we don’t support downgrades in this versioning scheme/system (see Non-goals and Future work). So, there is a danger if the cluster operator tries to downgrade the group_coordinator feature flag from v2 to v1 using the --allow-downgrade flag (see these examples). This is because the stream client side shall not downgrade at all, and this would crash the EOS applications. A safe practice is to first downgrade the stream clients, before downgrading  the group_coordinator feature flag.

Potential features in Kafka

There could be many features benefitted by the above system, following are a selected few high level examples:

  1. group_coordinator: As described in the above section.

  2. transaction_coordinator: This feature flag could cover changes to the message format for the transaction state internal topic schema, and protocol changes related to transaction coordinator. The advantages are similar to #2.

  3. consumer_offsets_topic_schema: Currently the message format used in the consumer internal topics is tied to IBP. It’s a candidate for a feature, because we will be able to dynamically finalize a new version of the topic schema, without having to roll the brokers twice.

  4. inter_broker_protocol: For transitional purposes, the inter.broker.protocol itself can be made a coarse-grained feature. This could be a way to operationally migrate away avoiding the double roll during IBP bumps.

Compatibility, deprecation and migration plan

Migrating from IBP-based setup to versioning scheme

Here are the various phases of work that would be required to deprecate IBP completely.

Phase #1

Post development, we hit phase #1 where the new versioning system is ready to be used. Here we would like to migrate clusters from IBP-based validations to the new versioning system based setup. In order to achieve this, we shall once again use an IBP double roll. This means, once a cluster has fully migrated to a certain IBP version, we can almost entirely switch to using the new versioning scheme. Let’s say the value for such an IBP version is migration_ibp_version. Then, in order for the versioning system to safely provide a migration path, we do the following:

  • Firstly, the versioning system itself is a new thing. For the initial roll out, we should only operate the versioning system after the second IBP roll → this brings the IBP of the cluster to migration_ibp_version (that’s when versioning system is fully deployed).

  • As a safeguard, each broker will validate that it’s IBP version is at least at migration_ibp_version before applying broker validations for feature versions, and before advertising it's features in ZK.

  • As a safeguard, the controller will validate that it’s IBP version is at least at migration_ibp_version before allowing for feature version upgrades to be finalized in a cluster (via the ApiKeys.UPDATE_FEATURES API).

  • All other decision making logic based on feature versions in the broker code will always validate that the broker’s current IBP version is at least at  migration_ibp_version.

Phase #2

There can be a transitional phase where a Kafka cluster can use both IBP-based setup as well as the new versioning scheme. In order to completely deprecate the existing IBP-based setup, we would want to ensure we no longer have any references to IBP configuration in the broker code base. Once that is done, we can stop using IBP configuration altogether, and deprecate/remove the relevant support in the code.

Phase #3

This is the phase when we no longer use the IBP-based setup and have completely switched to using the new versioning scheme.

Phase #4 (long-term)

Completely deprecate IBP, and remove references to the code.

Migrating from versioning scheme back to IBP (i.e. emergency downgrade)

We do not foresee this happening once the migration is over, and IBP has been deprecated. However, it can happen that we would want to switch back to using IBP-based setup while we are in Phase #3 above (for example, due to an unprecedented issue). This will be supported, because we deprecate IBP only in Phase #4 above. To "downgrade", a user would first need one rolling restart of the cluster to reduce the IBP to the required version. Then another rolling restart to change the binary. Note that each feature will have its own set of requirements to make this possible (in some cases, it may not be possible), but this is outside the scope of this document.

Future work

As part of future work, we could consider adding better support for downgrades & deprecation:

  • We may consider adding more constraints to the controller to even further restrict the downgrade of specific flags. Certain feature version downgrades may be disallowed if it was previously declared to be an incompatible downgrade. For example, group_coordinator feature is tied to data schemas and EOS processing semantics that may not be forward compatible.
  • We may consider providing a mechanism through which the cluster operator could learn whether a feature is being actively used by clients at a certain feature version. This information can be useful to decide whether the feature is safe to be deprecated. Note that this could involve collecting data from clients about which feature versions are actively being used, and the scheme for this needs to be thought through.

Rejected alternatives

  1. We considered the idea of using the existing AlterConfigsRequest instead of introducing a new API to update features. Reasons to decide against using it:
    1. The AlterConfigsRequest format is not well equipped to elegantly express the notion of Set<FeatureUpdate> operations. The request format is more favorable towards expressing a list of key-value pairs, where each new value replaces the existing value for the specified key. Furthermore, the response format doesn't let us conveniently return the list of finalized feature versions to the caller.
    2. AlterConfigsRequest can be issued to any broker, which is not our requirement. For the future, anything that modifies ZK needs to go through the controller. This is part of the work that we are doing to build the bridge release for KIP-500.
  2. As opposed to fine-granular features, we considered the idea of exposing the notion of IBP directly to the client for discovery. There are a few issues here:

    1. We are introducing a leaky abstraction here. Users/clients (excluding clients that are brokers) have to now operate with the notion of a "broker protocol version" – this is merely a detail that's internal to a cluster of brokers behind an API.
    2. This still doesn't solve the "double roll" problem related to IBP. We would need to still solve this problem.


  • No labels