You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: Under discussion

Discussion thread: <todo>

JIRA: <todo>

Contents

Background

There are basically two somewhat separate areas that we're talking about within the scope of this KIP.

Client Discovery

The first area is discovery on the client side. How do clients of Kafka become aware of broker capabilities?  For example, how does the client find out if the broker supports a new type of event delivery semantics, or a new type of broker storage functionality? Note that here the "client" here may be a producer, a consumer, a command-line tool, a graphical interface, a third-party application built on top of Kafka (for purposes such as administration/monitoring), or may be even the broker itself trying to communicate with other brokers via RPCs.

Today, the client relies on the ApiVersions{Request|Response} to determine at a per-request level which version of request should be used to send to the brokers. This is sufficient for client-side logic that only relies on the specific versions of request, however for other logic that requires client-side behavior to be different as well, such request-level ApiVersions check would not be sufficient. Detailed examples can be found below.

Feature Gating

Development in Kafka (from a high level) is organized into features, each of which is tracked by a separate version number. Feature gating tries to answer the question, How does the broker decide what features to enable?  For example, When is it safe for the brokers to begin handling EOS traffic? Should the controller send some additional information when it sends out LeaderAndIsrRequest? Should the broker write its log data in the "new" message format or one of the "old" ones? 

Upon first look, it might seem like we should always enable the latest features. However, feature gating becomes necessary in some cases:

  • Imagine the rolling upgrade of brokers, where, a Kafka cluster is being upgraded to use a new broker binary containing new versions of certain features. Until the binary upgrade completes successfully on all brokers, some brokers may support the new versions of the feature while some may not. This transitional phase can continue often for hours. During this time, any feature that would prevent successful communication and cooperation between the old and the new broker software must stay disabled. For example, the controller may not want to send out a new type of request, since the brokers which have not yet been upgraded would not understand this request.

  • Next, after a certain version of a certain feature is available on all brokers (i.e. post upgrade), today there is no safety net. An incompatible broker that’s not supporting an active version of the feature could still become active in the deployment. This could have bad consequences. For example, such a broker may be activated due to accidental operational errors causing an unintended downgrade/rollback. But the cluster doesn’t protect itself towards preventing this accident.

Motivation

The above areas translate into separate problems within the scope of this KIP, as described below.

Client Discovery

The ApiVersionsRequest enables the client to ask a broker what APIs it supports. For each API that the broker enumerates in the response, it will also return a version range it supports. This response may be insufficient for cases where a client needs to enable a feature based on whether all brokers in the cluster support it. Consider the following problem. Users could upgrade their client apps before or after upgrading their brokers. If it happens before the broker upgrade, any new features on the client apps that depend on certain broker-side support (from all brokers in the cluster) must stay disabled, until all the brokers have also been upgraded to the required version. The question then becomes, at what point in time can these new features be safely enabled on the client side? Today, this has to be decided manually by a human, and it is error-prone. Clients don’t have a way to programmatically learn what version of a certain feature is guaranteed to be supported by all brokers (i.e. cluster-wide finalized feature versions), and take suitable decisions.

For example, consider the Kafka Streams use case. For KIP-447, Streams would need to switch their clients to use the new thread level producer model only after all brokers support the same. But, switching to the new model at a time when some brokers don’t support the new model, can have consequences that break EOS semantics. Note that, if only some brokers support the new model, then Streams can continue to still use the old per-task producer model, without switching to the new model — this is not a problem.

Feature Gating

There is a configuration key in Kafka called inter.broker.protocol (IBP). Currently, the IBP can only be set by changing the static configuration file supplied to the broker during startup. Although the IBP started out as a mechanism to control the version of RPCs that the brokers used for communication with each other, it has grown since then to gate a huge number of other features. For example, the IBP controls whether the broker will enable static consumer group membership, whether it will enable exactly once functionality, and whether it will allow incremental fetch requests.

There are certain problems with the IBP configuration. The IBP assumes a one-dimensional spectrum of versions. Also, managing the inter.broker.protocol is cumbersome and error-prone. Changes to this configuration key requires a costly broker restart. This is the source of the infamous "double roll."  If a customer wants to upgrade to the latest Kafka software and use the latest features, they must perform a rolling upgrade (first roll).  But then, after the upgrade is complete, in order to update the IBP, they will need to modify every configuration file in the cluster and restart every broker (second roll) – this is a costly operation. Even worse, if they leave the IBP configuration key unset, it will default to the latest version, which will probably cause confusing cluster instability during the first roll.

Goals

The goals of this KIP are the following. For the above problems of client discovery and feature gating, we would like to propose a flexible, operationally easy, safe, and maintainable solution:

  1. Client discovery: Provide an API to programmatically access the metadata. Here, the “metadata” refers to the cluster-wide finalized versions of broker features. We would like to serve this metadata in a versioned, eventually consistent and scalable way.

  2. Feature gating: Provide an API to safely, durably and dynamically finalize the upgrades to cluster-wide supported versions of features. The API (and it’s related tooling) can be used by the cluster operator (i.e. typically a human) to finalize feature version upgrades.

  3. IBP deprecation: As a good side-effect of the above, we would like to provide a path to eventually deprecate the need for the cumbersome inter.broker.protocol configuration, and the broker “double roll” mechanism. A very tangible benefit of the solution is that we should be able to do broker upgrades with just a single rolling restart.

Explanation

  1. Client Discovery:

    • By scalable, we mean the solution should horizontally scale to the metadata read and write workload as the cluster grows with more features, brokers and clients. It is expected that metadata discovery is read heavy, while write traffic is very low (feature versions are finalized typically during releases or other configuration pushes, which, can happen few times a day per cluster).

    • The metadata served to the client will be versioned. The version numbers will increase monotonically whenever the metadata changes. At any given time, the latest version of the metadata returned to the client is valid, and the cluster functions in accordance with the same. Note: here the metadata version # applies for the entire metadata contents, and is unrelated to the individual feature version – these are 2 separate things.

    • We want to be careful and specific about what we mean by consistent here. The metadata we serve to the client during discovery, will be eventually consistent. It turns out that scaling to strongly consistent metadata reads is not easy (in the current Kafka setup). And the cost of a solution that’s eventually consistent, can be made minimal, in the following way. Due to eventual consistency, there can be cases where an older version of the metadata is briefly returned during discovery, after a more recent version was returned at a previous point in time. We expect clients to tolerate staleness by no more than a difference of 1 metadata version number, and always accept the latest received metadata version to trump an older version. The client should strongly consider discovering the latest metadata once during startup, and if required refresh the metadata periodically (to get the latest metadata).

  2. Feature gating:

    • By safe, we mean: when processing a request to finalize a set of feature versions, the system will dynamically verify that all brokers in the cluster support the intended version. If not, the request will be rejected. Also, when a broker starts up, it will verify that it is compatible with the configured feature versions. If it is not, it will either refuse to start up or eventually die before it becomes an active member of the cluster (by active, we mean the point when the broker starts serving API requests).

    • By durable, we mean that the finalized features should be persisted durably and remembered across broker restarts. Generally the system should tolerate faults, as much as it does for storage of other critical metadata (such as topic configuration).

    • By dynamic, we mean that the finalized features can be mutated in a cheap/easy way without compromising the uptime of broker/controller processes, or the health of the cluster.

Non-Goals

Following are problems surrounding downgrades/deprecation, that we don’t intend to solve with this KIP:

  • Sometimes there can be a need to downgrade the cluster-wide availability of one or more broker features from some finalized version X to a version Y, where Y < X. Firstly, we leave it to the cluster operator (i.e. human) to decide whether the downgrade is backwards compatible in the first place i.e. externally ensure the changes introduced/exercised in version Y are no longer relevant in the system ahead of the downgrade. As for the solution we provide, the rules are the same as with upgrades → such a downgrade request is rejected by the system, unless all brokers support the downgraded version Y of the feature.

  • A need can arise to deprecate the usage of a certain version of one or more broker feature. We do not provide any special support for this. Instead, the cluster operator (i.e. human) should use external means to establish that it is safe to stop supporting a particular version of broker feature. For example, verify (if needed) that no clients are actively using the version, before deciding to stop supporting it.

Proposed Changes

Below is a TL;DR of the changes:

  • Each broker will advertise the version range of it’s features in it’s own BrokerIdZnode during startup. The contents of the advertisement are specific to the broker binary, but the schema is common to all brokers.

  • The controller already watches BrokerIdZnode updates, and will serve as a gateway (via a newly introduced API) to finalize version upgrades to features – controller is well positioned to serve this requirement since it has a global view of all BrokerIdZnode in the cluster, and can avoid most race conditions in the solution.

  • As for the metadata about cluster-wide finalized versions of features, this information will be persisted in ZK by the controller. The controller will be the one and only entity modifying this information (we expect metadata write traffic to be very low). It is also responsible for fanning out the ZK updates to all brokers in the cluster.

  • Finally, client discovery about finalized feature versions will be performed via ApiKeys.API_VERSIONS API – the response will be modified to contain the necessary information (served from broker cache). Using this decentralized approach, we scale for metadata reads, albeit with eventually consistent results.

Broker advertisements

Each broker’s supported dictionary of feature versions will be defined in the broker code. But the controller needs a way to discover this metadata from each broker. To facilitate the same, during startup, each broker will advertise it’s supported feature versions in it’s own ephemeral BrokerIdZnode (this is the existing ZK node at the path /brokers/ids/<id>), under a nested dictionary keyed features. The controller already watches all BrokerIdZnode for updates, and thus can get it’s ZK cache populated with the per-broker versioning information (using existing means).

The schema for the advertised information is similar to the one in /features ZK node (see this section). Here is an example of a BrokerIdZnode with the proposed additional metadata towards the bottom.

BrokerIdZnode schema


{ 
   "listener_security_protocol_map":{ 
      "INTERNAL":"PLAINTEXT",
      "REPLICATION":"PLAINTEXT",
      "EXTERNAL":"SASL_SSL"
   },
   "endpoints":[ 
      "INTERNAL://kafka-0.kafka.def.cluster.local:9071",
      "REPLICATION://kafka-0.kafka.abc.cluster.local:9072",
      "EXTERNAL://b3-kfc-mnope.us.clusterx:9092"
   ],
   "rack":"0",
   "jmx_port":7203,
   "host":"kafka-0.kafka.def.cluster.local",
   "timestamp":"1579041366882",
   "port":9071,
   "version":4,
   // ----- START: PROPOSED ADDITIONAL METADATA -----
   "features": { 
      "exactly_once_semantics": {  // string -> name of the feature
          "minVersion": 0,   // int64 -> represents the min supported version of this feature
          "maxVersion": 3  // int64 -> represents the max supported version of this feature
      },
      "consumer_offsets_topic_schema": { 
          "minVersion": 1,
          "maxVersion": 4
      }
   }
   // ----- END: PROPOSED ADDITIONAL METADATA -----
}

Persistence of finalized feature versions

The proposal is that cluster-wide finalized feature versions will be persisted in a specific common ZK node. The path to the ZK node is proposed as:  /features . The node content type is JSON (string), and the size is expected to be typically small (in several KBs). Couple high level details:

  • The node data shall be readable via existing ZK tooling. An eventually consistent copy of the node data shall also be made readable via existing ApiKeys.API_VERSIONS API served by any broker in the cluster. (see this section)

  • During regular operations, the data in the ZK node can be mutated only via a specific admin API served only by the controller.

The above proposed read/write paths are described later in this doc. Below is an example of the contents of the /features ZK node, with it’s schema explained below.

Schema

[
   {
         "__schema_version__": 0,   // int64 -> represents the version of the node schema
         "__data_version__": 0  // int64 -> represents the version of the data
   },
   {
        "exactly_once_semantics": {   // string -> name of the feature
            "version": 3 // int64 -> represents the cluster-wide finalized version of this feature
        },
        "consumer_offsets_topic_schema": { 
            "version": 4
        }
   }
]

The schema is a JSON list with 2 items, each being a dictionary and each with unique keys, as explained below:

  • First item in the list is a dictionary. It's a Map<String, Long> containing node metadata (such as data version).

  • Second item in the list is again a dictionary. It's a Map<String, Map<String, Long>>. Imagine this as being a map of:
    <feature_name>
                   |--><metadata_key>
                                        |--><metadata_value>

    • Top-level key <feature_name> is a non-empty string that’s a feature name.

      • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature or API metadata.

      • <metadata_value> is a string that’s the value for the <metadata_key>.

      • Note: this nested dictionary should contain at least the key: version whose value represents a finalized cluster-wide version.

New controller API

We introduce a new Admin API that’s served only by the controller, and identified by the new API key: ApiKeys.UPDATE_FEATURES. This API enables safe application of a set of cluster-wide feature updates:

  • The input to the API is a list of FeatureUpdate that need to be applied. Each item specifies an operation (as a FeatureUpdateType ) such as: adding a new Feature, updating or deleting an existing Feature.

  • The response (during success) is a list of latest finalized Feature , with a metadata version number (monotonically increasing).

To help explain things better, here are the request and response definitions for the new API (also see section #6 showing related pseudocode for the Admin API):

UpdateFeaturesRequest schema

{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "FeatureUpdate", "type": "[]FeatureUpdateKey", "versions": "1+",
      "about": "The list of updates to features.", "fields": [
      {"name":  "FeatureUpdateType", "type":  "int16", "versions":  "1+",
        "about": "The type of feature update (ex: ADD_OR_UPDATE, DELETE etc.)."},
      {"name":  "Feature", "type":  "[]FeatureKey", "versions":  "1+",
        "about":  "The feature to be updated.",
        "fields":  [
          {"name": "Name", "type":  "string", "versions":  "1+",
            "about": "The name of the feature."},
          {"name":  "Version", "type":  "int64", "versions":  "1+",
            "about": "The new finalized version for the feature."}
      ]}
    ]}
  ]
}

UpdateFeaturesResponse schema

{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    {"name": "MetadataVersion", "type": "int64", "versions": "1+",
      "about": "The monotonically increasing version of the metadata for finalized features."},
    { "name":  "Features", "type": "[]FeatureKey", "versions":  "1+",
      "about": "Cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type":  "string", "versions":  "1+",
          "about": "The name of the feature."},
        {"name":  "Version", "type":  "int64", "versions":  "1+",
          "about": "The finalized version for the feature."}
      ]
    }
  ]
}

Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at /brokers/ids/<id>). The contents of the broker nodes are cached in memory in the Controller.

Underneath hood, the implementation of the ApiKeys.UPDATE_FEATURES API does the following:

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.UPDATE_FEATURESAPI call at any given time.

  2. Prior to mutating /features in ZK, the implementation verifies that all broker advertisements (as seen by it thus far) contained no incompatibilities with the provided List<FeatureUpdate> (see Validations section below).

  3. Once validations in #2 are successful, the List<FeatureUpdate> are applied to contents of /features ZK node, and it’s __data_version__ is incremented by 1.

  4. If #3 is successful, ApiKeys.UPDATE_METADATA requests are issued to all brokers in the cluster. This can incur a slight latency cost, so we expect this API call to succeed/fail in worst case within few seconds (depending on the size of the cluster).

  5. Finally, an UpdateFeaturesResponse is returned to the user. The metadata version number is the latest __data_version__ in the ZK node: /features after the mutations were applied to it.

Validations

For any <feature_name>, the above API implementation guards against a change for the related entry in /features from {"version": X} to {"version": X’}, unless, it notices that each live broker in the deployment advertises {"maxVersion": Y >= X’} and {"minVersion": Z <= X’} in it’s BrokerIdZnode (for the same <feature_name>).

  1. For the above guarding check, in each live broker,

    1. The absence of a required feature is considered an incompatibility → such a case will fail the API request.

    2. Whereas, if any broker contains an additional feature that’s not required → this is not considered an incompatibility.

  2. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

  3. Enabling the actual semantics of a feature version cluster-wide is left to the discretion of the logic implementing the feature (ex: can be done via dynamic broker config).

  4. Deprecating/eliminating the semantics of a specific feature version cluster-wide is left to the discretion of the logic backing the feature (ex: can be done via dynamic broker config). However, it may need further external verification to ensure no entity (ex: a consumer, or a broker) is actively using the feature (see Non-goals section).

Guarantees

When a call to the ApiKeys.UPDATE_FEATURES API returns a success return code to the client, the following is guaranteed:

  1. Each FeatureUpdate provided in the request was valid, and all updates were persisted to the /features ZK node by the controller (along with a bump to the node’s __data_version__ ).

  2. An ApiKeys.UPDATE_METADATA call was successfully made by the controller to all live brokers (as seen by the controller) with the latest dictionary of cluster-wide finalized feature versions.

  3. Future ApiKeys.UPDATE_METADATA calls from controller to brokers will contain the latest set of cluster-wide feature versions, enabling brokers to shutdown when incompatibilities are detected (see Broker protections section).

  4. The returned object in the ApiKeys.UPDATE_FEATURES API call contains the latest set of cluster-wide feature versions (after all updates were applied), which can be useful to the client for debugging/logging purposes.

When a call to the ApiKeys.UPDATE_FEATURES API has failed on the client side, the following is guaranteed:

  1. A server-side error means the /features ZK node is guaranteed to be left untouched.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of List<FeatureUpdate> may have either succeeded or failed.

Here is the new schema of UpdateMetadataRequest with the finalized features added to it (the metadata version contains the latest value, after it was incremented):

UpdateMetadataRequest schema (updated)


{
  "apiKey": 6,
  "type": "request",
  "name": "UpdateMetadataRequest",
  "validVersions": "0-6",
  "flexibleVersions": "6+",
  "fields": [
    ...
    ...
    { "name": "LiveBrokers", "type": "[]UpdateMetadataBroker", "versions": "0+", "fields": [
        { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The broker id." },
          ...
          ...
          { "name": "SecurityProtocol", "type": "int16", "versions": "1+",
            "about": "The security protocol type." }
        ]},
        { "name": "Rack", "type": "string", "versions": "2+", "nullableVersions": "0+", "ignorable": true,
          "about": "The rack which this broker belongs to." }
    ]},
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    {"name": "MetadataVersion", "type": "int64", "versions": "6+",
      "tag": 10000, "taggedVersions": "6+",
      "about": "The monotonically increasing version of the metadata for finalized features."},
    { "name":  "Features", "type": "[]FeatureKey",
      "versions":  "6+", "tag": 10001, "taggedVersions": "6+",
      "about": "The list of finalized features.",
      "fields":  [
        {"name": "Name", "type":  "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "Version", "type":  "int64", "versions":  "3+",
          "about": "The finalized version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ],
  "commonStructs": [
    ...
    ...
  ]
}

Client discovery of finalized feature versions

The latest list of cluster-wide finalized feature versions will be served via the existing ApiKeys.API_VERSIONS API. This is used for client discovery, as well as for debuggability of the system. We will introduce a tagged optional field in the schema of the ApiVersionsResponse to accommodate the information. Couple implementation details:

  • In the UpdateMetadataRequest request handler path, we will populate the MetadataCache with the latest list of finalized features as sent to it by the controller (also see Broker protections section in this doc).

  • We will modify the implementation of the ApiKeys.API_VERSIONS API to populate the information about finalized features in the response. This will contain the cluster-wide finalized versions from the broker’s internal MetadataCache (as explained above), as well as the broker-level supported versions.

  • The finalized feature versions populated in the response will be eventually consistent, since it is served from information propagated via ApiKeys.UPDATE_METADATA API by the controller.

Below is new ApiVersionsResponse schema showing the two new tagged optional fields for feature versions.

Note that the field FinalizedFeaturesMetadataVersion contains the latest version of the metadata stored in the __data_version__ field in the ZK node. On the client side, the value of this field should be used to ignore older metadata returned in ApiVersionsResponse, after newer metadata was returned once (eventual consistency).

ApiVersionsResponse schema (updated)


{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version, inclusive." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version, inclusive." }
      ]
    },
    {"name": "FinalizedFeaturesMetadataVersion", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing version of the metadata for finalized features."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "Version", "type": "int64", "versions":  "3+",
          "about": "The version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

Broker protections against race conditions

Certain validations will be introduced at few points in the broker code. The purpose is to avoid race conditions where incompatible brokers remain active in a cluster. The validations affirm that the supported feature versions in the broker are compatible with the expected cluster-wide feature versions. If any of these checks fail, this would trigger the broker shutdown sequence, and the process eventually exits with a non-zero exit code. The places where the validation is going to be introduced, are explained below:

  1. Validation shall be introduced during broker startup. This involves synchronously reading the cluster-wide feature versions from the /features ZK node just after initializing the ZK client, and before creating the broker’s own ephemeral node (roughly here). The feature versions available in the broker are checked against the contents of the node to ensure there are no incompatibilities.

  2. Validation shall be introduced in the ApiKeys.UPDATE_METADATA request handler code path. This affirms that the feature versions available in the broker are compatible with the cluster-wide feature versions sent by the controller in the ApiKeys.UPDATE_METADATA call. The broker carries out this validation as a first step when processing this API request, so if an incompatibility is detected, the broker will be made to shutdown immediately.

NOTE: The logic for the validations will be exactly the same as the one described under Validations section under Controller API.

Tooling support

We shall introduce a CLI tool backed by a new admin command type called kafka.admin.FeatureCommand. The implementation will be inspired by the various command classes written in Scala that already enable us to carry out things such as CreateTopics, DeleteTopics, AlterConfigs etc. from CLI.  The new FeatureCommand will be used by the cluster operator (i.e. a human), and will enable us to do the following:

  • Read cluster-wide finalized feature versions from a broker via it’s ApiKeys.API_VERSIONS API.

  • Add/update/delete cluster-wide feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API.

We shall introduce couple new APIs in the Admin interface, which enables us to read/finalize the feature version upgrades. Here is Java-ish pseudocode for the same:

// ---- START: Proposed Admin API definitions ----
/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

/**
 * List the feature versions supported cluster-wide (via ApiVersions API).
 */
DescribeFeaturesResult describeFeatures();

// ---- END: Proposed Admin API definitions ----

interface Feature {
    // The name of the feature to be updated.
    String getName();

    // The new value of the feature version.
    Long getVersion();
}

enum FeatureUpdateType {
    // Delete the feature. Useful to eliminate accidental junk,
    // or rarely eliminate a deprecated feature altogether.
    DELETE,

    // Either add the feature (if not present), or update it (if present).
    ADD_OR_UPDATE
}

interface FeatureUpdate {
    // Return the feature to be updated.
    Feature getFeature();
    
    // Return the type of update to be made.
    FeatureUpdateType getUpdateType();
}

interface FinalizedFeatureMetadata {
      // The set of cluster-wide finalized features.
    Set<Feature> features;
    
    // The monotonically increasing metadata version.
    Long getMetadataVersion();
}

interface UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or  
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied).
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the first failed FeatureUpdate
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FinalizedFeatureMetadata> all();
}

interface DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features.
     */
    KafkaFuture<FinalizedFeatureMetadata> all();
}

Use case: group_coordinator feature flag (for KIP-447)

As described in the motivation for this KIP, the Kafka Streams use case is going to be immediately benefitted by programmatically learning the cluster-wide version of EOS feature on the brokers (KIP-447). We shall use the group_coordinator feature flag to achieve the same, as explained below. This feature flag will cover both schema and protocol changes related to group coordinator (KIP-447 falls under a change to the group coordinator).

Imagine we shipped the feature versioning scheme before the shipment of KIP-447. There would then be at least two versions of the broker binary:

  • One of these (the old/existing broker bits) would only advertise group_coordinator version 0 (shortly referred to as v0 below). v0 doesn’t contain the latest EOS semantic described in KIP-447.

  • The other (the new broker bits) would advertise group_coordinator feature v0-v1 i.e. v0 as well as v1. v1 contains the EOS semantic described in KIP-447.

Kafka Streams client uses the consumer rebalance protocol to propagate the group metadata information. There would be one nominated leader among the group. The sequence of feature upgrade events in the happy path shall be the following (T0, T1, …, Tn refers to increasing time):

  • T0: The broker cluster is running with old binary, and a stream application is communicating with it. The stream clients will be using v0 group coordinator feature. The leader stream thread would use its admin client to periodically call DescribeFeatures on a random broker in the cluster (presumably every minute), to get information about the latest cluster-wide enabled features. Thus, it would learn about the group_coordinator feature flag, with the finalized version at v0 .
  • T1: The cluster undergoes a rolling upgrade to the new broker binary. Once the upgrade is complete, every broker in the cluster is expected to support v0-v1 versions for the group_coordinator feature flag.
  • T2: The controller still will has enabled only group_coordinator:v0 in /features ZK node. So, the above rolling has no effect (yet) on the streams client side.
  • T3: Knowing that the cluster upgrade has completed, the cluster operator (i.e. a human) sends an ApiKeys.UPDATE_FEATURES request to the controller. This is to finalize the cluster-wide group_coordinator feature version upgrade from v0 to v1. When this request is successful, it means the following:
    → The controller has checked all brokers in the cluster advertised support for group_coordinator:v1, and it has persisted the upgrade to ZK /features node.
    → All brokers received an ApiKeys.UPDATE_METADATA request from the controller informing them that group_coordinator:v1 has been finalized cluster-wide.
  • T3-4: The streams leader thread, at a certain point in time will see the above change (via the periodic DescribeFeaturescall). It will then trigger a rebalance of the streams group plumbing down group_coordinator:v1 feature flag to all members. It is key to note that this event could happen concurrently with T3 that’s why the event is labelled T3-4.
  • T5: Every member of the streams group will be gradually upgrading towards group_coordinator:v1 just as the leader instructed above. This is when the stream client will switch from the old per-task producer model to the new thread level producer model (for KIP-447).

There are some edge cases in this happy upgrade path we shall explain below.

Delay of feature version propagation

Due to eventual consistency in metadata propagation, it is possible that some brokers get notified about group_coordinator:v1 later than others. At T3-4, the streams leader thread first upgrades to group_coordinator:v1 when it learns of a feature bump from the broker. Then, after T5 , there exists a case where the leader streams thread could query a stale broker, and learn that the feature support only contains v0. If this situation is not handled properly, we are in a danger of downgrade. This is because in KIP-447 we don’t support downgrade of the processing model, unless user wipes out the old stream instances and restarts from scratch. Following is done to make sure the above scenario never affects the stream use case:

  1. The stream clients shall not downgrade to group_coordinator:v0 once they upgrade to group_coordinator:v1, even if the leader instructs them to do so. This is because, the feature version in the DescribeFeatures for group_coordinator:v1 is higher than earlier response. This will be therefore used by the client to ignore the earlier response from the broker. (see this section to learn about the monotonically increasing metadata version)

  2. It is important to note that the user is only upgrading the cluster, from enabling group_coordinator:v0 to enabling group_coordinator:v0-v1. Therefore, once a client sees that v1 is enabled on any broker, it should be a clear signal that all broker binaries within the cluster would support group_coordinator:v1 semantic, it’s just a matter of time before feature version update metadata propagates to every broker from controller.

The above ensures that the upgrade logic is reliable/safe, as there is no case of a downgrade affecting the stream client (during the upgrade path).

Downgrade of group_coordinator feature

As such, we don’t support downgrades in this versioning scheme/system (see Non-goals). So, there is a danger if the cluster operator tries to downgrade the group_coordinator feature flag from v1 to v0. This is because the stream client side shall not downgrade at all, and this would crash the EOS applications. As part of future work, we may consider adding constraints to the controller to restrict the downgrade of specific flags (unless there is a sign that it is done consciously). For example, group_coordinator feature is tied to data schemas and EOS processing semantics that’s not forward compatible.

Potential Features in Kafka

There could be many features benefitted by the above system, following are a selected few high level examples:

  1. group_coordinator: As described in the above section.

  2. transaction_coordinator: This feature flag could cover changes to the message format for the transaction state internal topic schema, and protocol changes related to transaction coordinator. The advantages are similar to #2.

  3. consumer_offsets_topic_schema: Currently the message format used in the consumer internal topics is tied to IBP. It’s a candidate for a feature, because we will be able to dynamically finalize a new version of the topic schema, without having to roll the brokers twice.

  4. inter_broker_protocol: For transitional purposes, the inter.broker.protocol itself can be made a coarse-grained feature. This could be a way to operationally migrate away avoiding the double roll during IBP bumps.

Compatibility, Deprecation and Migration Plan

Migrating from IBP-based setup to versioning scheme

Phase #1

Post development, we hit phase #1 where the new versioning system is ready to be used. Here we would like to migrate clusters from IBP-based validations to the new versioning system based setup. In order to achieve this, we shall once again use an IBP double roll. This means, once a cluster has fully migrated to a certain IBP version, we can almost entirely switch to using the new versioning scheme. Let’s say the value for such an IBP version is migration-ibp-version. Then, in order for the versioning system to safely provide a migration path, we do the following:

  • Firstly, the versioning system itself is a new thing. For the initial roll out, we should only operate the versioning system after the second IBP roll → this brings the IBP of the cluster to migration-ibp-version (that’s when versioning system is fully deployed).

  • As a safeguard, each broker will validate that it’s IBP version is at least at migration-ibp-version before applying broker validations for feature versions, and before advertising it's features in ZK.

  • As a safeguard, the controller will validate that it’s IBP version is at least at migration-ibp-version before allowing for feature version upgrades to be finalized in a cluster (via the ApiKeys.UPDATE_FEATURES API).

  • All other decision making logic based on feature versions in the broker code will always validate that the broker’s current IBP version is at least at migration-ibp-version.

Phase #2

There can be a transitional phase where a Kafka cluster can use both IBP-based setup as well as the new versioning scheme. In order to completely deprecate the existing IBP-based setup, we would want to ensure we no longer have any references to IBP configuration in the broker code base. Once that is done, we can stop using IBP configuration altogether, and deprecate/remove the relevant support in the code.

Phase #3

This is the phase when we no longer use the IBP-based setup and have completely switched to using the new versioning scheme.

Phase #4

Completely deprecate IBP, and remove references to the code.

Migrating from versioning scheme back to IBP (i.e. emergency downgrade)

We do not foresee this happening once the migration is over, and IBP has been deprecated. However, it can happen that we would want to switch back to using IBP-based setup while we are in Phase #3 above (for example, due to an unprecedented issue). This will be supported, because we deprecate IBP only in Phase #4 above. To "downgrade", a user would first need one rolling restart of the cluster to reduce the IBP to the required version. Then another rolling restart to change the binary. Note that each feature will have its own set of requirements to make this possible (in some cases, it may not be possible), but this is outside the scope of this document.

Rejected Alternatives

<?>



  • No labels