Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "apiKey": 52,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to finalized features.", "fields": [
      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the finalized feature to be updated."},
      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
        "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
        "about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgraded request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}
    ]}
  ]
}

...

Deprecating a feature version is an incompatible change, which requires a major release of Kafka (see Non-goals section). This requirement translates to increasing the cluster-wide finalized minimum version level of one or more features in the ZK '/features' node. It is important to note that the minimum version level can not be mutated via the Controller API. This is because, the minimum version level is usually increased only to indicate the intent to stop support for a certain feature version. We would usually deprecate features during broker releases, after prior announcements. Therefore, this is not a dynamic operation, and such a mutation is not supported through the ApiKeys.UPDATE_FEATURES controller API.

Instead, it is sufficient if such changes are done directly by best if the version deprecation is activated through the controller i.e. during a certain Kafka release we would change the controller code to mutate the '/features' ZK node increasing the minimum version level of one or more finalized features (this will be a planned change, as determined by Kafka developers). Then, as this Broker release gets rolled out to a cluster, the feature versions will become permanently deprecated. This is the proposed approach to achieve the same:
Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at '/brokers/ids/<id>'). The contents of the broker nodes are cached in memory in the Controller. Using these contents, the controller should be able to track the highest supported feature minimum version exported by any broker in the cluster. Then, whenever a Kafka release advances the supported feature minimum version (to deprecate feature version), the controller picks up this change and persists the highest supported minimum version to ZK as the finalized minimum version level for the feature. This change by the controller will deprecate the finalized feature versions, unto 1 below the highest supported minimum version.

It is important that an admin should make sure that no clients are using a deprecated feature version (e.g. using the client version metric) before deploying a release that deprecates it.


Client discovery of finalized feature versions

...

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int16", "versions": "3+",
          "about": "The minimum supported version for the feature." },
        { "name": "MaxVersion", "type": "int16", "versions": "3+",
          "about": "The maximum supported version for the feature." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+", "default": "-1", "ignorable": true,
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "MaxVersionLevel", "type": "int16", "versions":  "3+",
          "about": "The cluster-wide finalized max version level for the feature."},
        {"name":  "MinVersionLevel", "type": "int16", "versions":  "3+",
          "about": "The cluster-wide finalized min version level for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Describes supported and finalized features. You may anticipate 
 * certain exceptions when calling get() on the future obtained from the returned
 * DescribeFeaturesResult.
 *
 * @param options   options for the describeFeatures API, 
 * 
 * @return          a result object containing:
 *                    1. List of cluster-wide finalized feature versions.
 *                    2. List of supported feature versions specific to the broker.
 */
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult.
 *
 * @param updates   map of feature updates, keyed by the
 *                  name of the feature
 * @param options   options for the updateFeatures API
 *
 * @return          the results of the FeatureUpdate provided in the request
 */
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

// ---- END: Proposed Admin API definitions ----

// Represents a range of version levels supported by every broker in a cluster for some feature.
class FinalizedVersionRange {
    // The cluster-wide finalized value of the feature min version level (value >= 1).
    short minVersionLevel();

    // The cluster-wide finalized value of the feature max version level (value >=1 and value >= minVersionLevel).
    short maxVersionLevel();
}

// **
 * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
 *
 * The API of this class is evolving.
 */
@InterfaceStability.Evolving
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {
  // Currently empty, but can be populated in the future as needed.
}

/**
 * Options for {@link AdminClient#describeFeatures(UpdateFeaturesOptions)}.
 *
 * The API of this class is evolving.
 */
@InterfaceStability.Evolving
public class UpdateFeaturesOptions extends AbstractOptions<UpdateFeaturesOptions> {
  // Currently empty, but can be populated in the future as needed.
}

// ---- END: Proposed Admin API definitions ----

// Represents a range of version versionslevels thatsupported aby particularevery broker in supportsa cluster for some feature.
class SupportedVersionRangeFinalizedVersionRange {
	    // The minimum cluster-wide finalized value of the feature min version level (value >= 1).
 of the supported feature.
	short minVersionminVersionLevel();

	    // The maximumcluster-wide finalized value of the feature max version level (value >=1 and value >= minVersionminVersionLevel).
 of the supported feature.
	short maxVersionmaxVersionLevel();
}

/**
 * Represents an update to a Feature, which can be sent to the controller
 * for processing.
 */
class FeatureUpdate {
    /**
     * The cluster-wide finalized NEW value of the feature max version level.
     * - When >= 1, it's the new value to-be-updated for the finalized feature.
	 * - When < 1, it indicates the deletion of a finalized feature.
     */
    short maxVersionLevel();
    
    /**
     * Return true only if downgrade/deletion of a feature should be allowed.
     * Note that despite this allowDowngrade flag being set, certain downgrades
     * may be rejected by the controller if it is deemed unsafe to downgrade
     * the max version level of some specific feature.
     */
    bool allowDowngrade();
}

/**
 * Encapsulates details about finalized as well as supported features. This is particularly useful
 * to hold the result returned by the `Admin#describeFeatures(DescribeFeaturesOptions)` API.
 */
class FeatureMetadata {
    /**
     * Returns a map of finalized feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of version levels supported by every broker in the
     * cluster.
     */
	Map<String, FinalizedVersionRange> finalizedFeatures();

    /**
     * The monotonically increasing epoch for the finalized features.
     * If the returned value is empty, it means the finalized features are absent/unavailable.
     */
    Optional<Long> finalizedFeaturesEpoch();

	/**
     * Returns a map of supported feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of versions supported by a particular broker in the
     * cluster.
     */
    Map<String, SupportedVersionRange> supportedFeatures();
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> featureMetadata();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which succeeds only if all the FeatureUpdate in the request succeed.
     */
    KafkaFuture<Void> all();

    /**
     * Returns a map with key being feature name and value being
     * the future which can be used to check the status of the FeatureUpdate
     * in the request.
     *
     * Possible error codes:
     * - NONE: The FeatureUpdate succeeded.
     * - NOT_CONTROLLER: The FeatureUpdate failed since the request was processed by a broker that's not the controller.
     * - CLUSTER_AUTHORIZATION_FAILED: The FeatureUpdate failed since there wasn't sufficient permission to perform the update.
     * - INVALID_REQUEST: The FeatureUpdate failed because it is invalid.
     * - FEATURE_UPDATE_FAILED: The FeatureUpdate failed because it can not be applied (ex: due to version incompatibilities)
     */
    Map<String, KafkaFuture<Void>> values();
}

Basic CLI tool usage

Following are examples of regular usage of the CLI tool, which involves the following activities:

...

Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

...



// Represents a range of versions that a particular broker supports for some feature.
class SupportedVersionRange {
	// The minimum version (value >= 1) of the supported feature.
	short minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	short maxVersion();
}

/**
 * Represents an update to a Feature, which can be sent to the controller
 * for processing.
 */
class FeatureUpdate {
    /**
     * The cluster-wide finalized NEW value of the feature max version level.
     * - When >= 1, it's the new value to-be-updated for the finalized feature.
	 * - When < 1, it indicates the deletion of a finalized feature.
     */
    short maxVersionLevel();
    
    /**
     * Return true only if downgrade/deletion of a feature should be allowed.
     * Note that despite this allowDowngrade flag being set, certain downgrades
     * may be rejected by the controller if it is deemed unsafe to downgrade
     * the max version level of some specific feature.
     */
    bool allowDowngrade();
}

/**
 * Encapsulates details about finalized as well as supported features. This is particularly useful
 * to hold the result returned by the `Admin#describeFeatures(DescribeFeaturesOptions)` API.
 */
class FeatureMetadata {
    /**
     * Returns a map of finalized feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of version levels supported by every broker in the
     * cluster.
     */
	Map<String, FinalizedVersionRange> finalizedFeatures();

    /**
     * The monotonically increasing epoch for the finalized features.
     * If the returned value is empty, it means the finalized features are absent/unavailable.
     */
    Optional<Long> finalizedFeaturesEpoch();

	/**
     * Returns a map of supported feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of versions supported by a particular broker in the
     * cluster.
     */
    Map<String, SupportedVersionRange> supportedFeatures();
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> featureMetadata();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which succeeds only if all the FeatureUpdate in the request succeed.
     */
    KafkaFuture<Void> all();

    /**
     * Returns a map with key being feature name and value being
     * the future which can be used to check the status of the FeatureUpdate
     * in the request.
     *
     * Possible error codes:
     * - NONE: The FeatureUpdate succeeded.
     * - NOT_CONTROLLER: The FeatureUpdate failed since the request was processed by a broker that's not the controller.
     * - CLUSTER_AUTHORIZATION_FAILED: The FeatureUpdate failed since there wasn't sufficient permission to perform the update.
     * - INVALID_REQUEST: The FeatureUpdate failed because it is invalid.
     * - FEATURE_UPDATE_FAILED: The FeatureUpdate failed because it can not be applied (ex: due to version incompatibilities)
     */
    Map<String, KafkaFuture<Void>> values();
}

Basic CLI tool usage

Following are examples of regular usage of the CLI tool, which involves the following activities:

  1. Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  2. Upgrade the max version levels of all features, to their latest values, as known to the CLI tool internally. This becomes useful after completing the deployment of a new Kafka Broker release onto an existing cluster. This removes the burden to individually finalize feature upgrades. 
  3. Downgrade the max version levels of all features, to the values known to the CLI tool internally. This becomes useful during an emergency cluster downgrade, after finalizing feature levels from a previous upgrade.

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
# - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.

$> kafka-features.sh
     --describe \
     --bootstrap-server kafka-broker0.prn1:9071 \

Feature: consumer_offsets_topic_schema  SupportedMinVersion: 1  SupportedMaxVersion: 1  FinalizedMinVersionLevel: -     FinalizedMaxVersionLevel: -     Epoch: 1
Feature: group_coordinator  SupportedMinVersion: 1  SupportedMaxVersion: 2  FinalizedMinVersionLevel: 1     FinalizedMaxVersionLevel: 1     Epoch: 1
Feature: transaction_coordinator  SupportedMinVersion: 1  SupportedMaxVersion: 5  FinalizedMinVersionLevel: 1     FinalizedMaxVersionLevel: 4     Epoch: 1

=== UPGRADE TO ALL LATEST FEATURES ===

# Upgrade to the max version levels of all features, as internally known to the CLI tool.
#
# - This command removes the burden to individually finalize feature upgrades.
#   This becomes handy to a cluster operator intending to finalize a cluster with all the latest
#   available feature version levels. This usually happens after completing the deployment
#   of a newer Kafka Broker release onto an existing cluster.
# - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh \
     --upgrade-all
Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.

$> kafka-features.sh describe \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 1
        },
        "transaction_coordinator": {
			"min_version_level": 1,
        	"max_version_level": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== UPGRADE TO ALL LATEST FEATURES ===

# Upgrade to the max version levels of all features, as internally known to the CLI tool.
#
# This command removes the burden to individually finalize feature upgrades.
# This becomes handy to a cluster operator intending to finalize a cluster with all the latest
# available feature version levels. This usually happens after completing the deployment
# of a newer Kafka Broker release onto an existing cluster.

$> kafka-features.sh finalize-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 3
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 6
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# Downgrade to the max version levels of all features known to the CLI tool.
#
# This command removes the burden to individually finalize feature version
# downgrades. This becomes handy to a cluster operator intending to downgrade all
# feature version levels, just prior to rolling back a Kafka Broker deployment
# on a cluster, to a previous Broker release.

$> kafka-features.sh downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 3
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 6
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

Advanced CLI tool usage

...

Code Block
=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--allow-downgrade` to allow a downgrade for feature version levels. This should be used only when required.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--upgrade group_coordinator:2,dry-run]

    [Add] Feature: consumer_offsets_topic_schema:1 \
     --allow-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
      ExistingFinalizedMaxVersion: -  NewFinalizedMaxVersion: 1       Result: OK
[Upgrade] Feature: group_coordinator       "min_version"ExistingFinalizedMaxVersion: 1,
  NewFinalizedMaxVersion: 2       Result:  "max_version": 2
OK
[Upgrade] Feature: transaction_coordinator      ExistingFinalizedMaxVersion: 4  },
NewFinalizedMaxVersion: 5       "transaction_coordinator"Result: {OK

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# Downgrade to the max 	"min_version": 1,
levels of all features known to the  	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 2
        },
        "transaction_coordinator": {
			"min_version_level": 1,
CLI tool.
#
# - This command removes the burden to individually finalize feature version
#   downgrades. This becomes handy to a cluster operator intending to downgrade all
#   feature version levels, just prior to rolling back a Kafka Broker deployment
#   on a cluster, to a previous Broker release.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh \
     --downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--dry-run]

   [Delete] Feature: consumer_offsets_topic_schema       ExistingFinalizedMaxVersion: 1    "max_version_level"NewFinalizedMaxVersion: 3
-       Result: },OK
[Downgrade] Feature: group_coordinator      "consumer_offsets_topic_schema"ExistingFinalizedMaxVersion: { 
			"min_version_level"2  NewFinalizedMaxVersion: 1,
       Result: OK
[Downgrade] Feature: transaction_coordinator  "max_version_level": 1
   ExistingFinalizedMaxVersion: 5  NewFinalizedMaxVersion: 4 }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

===Result: OK

Advanced CLI tool usage

Following are examples of advanced usage of the CLI tool. Going beyond regular usage, advanced usage involves adding/upgrading/downgrading/deleting specific cluster-wide finalized feature versions.

Code Block
=== ADD OR UPGRADE OR DOWNGRADE OR DELETE FEATURES ===

# DeleteAdd or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequestthe queryqueries should be issued.
# - Optionally, use The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued`--upgrade` to provide a comma-separated list of features and new finalized max version to add or upgrade.
# - -Optionally, Useuse `--features`downgrade` to provide a comma-separated list of finalized features toand be deleted.

$> kafka-features.sh delete \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },new finalized max version to downgrade to. This should be used only when required.
# - Optionally, use `--delete` to provide a comma-separated list of finalized features to be deleted.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --downgrade transaction_coordinator:3 \
     --delete replication_throttling \
     [--dry-run]

      [Add] Feature: "consumer_offsets_topic_schema":  { 
    ExistingFinalizedMaxVersion: -  NewFinalizedMaxVersion: 1       "min_version"Result: 1,OK
  [Upgrade] Feature: group_coordinator      	"max_version"ExistingFinalizedMaxVersion: 1
  NewFinalizedMaxVersion: 2       }
	},
	"finalized_features"Result: {
		"epoch": 2,
OK
[Downgrade] Feature: transaction_coordinator      ExistingFinalizedMaxVersion:  "consumer_offsets_topic_schema": {
			"min_version_level": 1,
4  NewFinalizedMaxVersion: 3       Result: OK
    "max_version_level": 1
[Delete] Feature: replication_throttling       ExistingFinalizedMaxVersion: }
2  NewFinalizedMaxVersion: },
-   "host": "kafka-broker0.prn1",
   "port"Result: 9071
}OK

New or changed public interfaces

...

  • T0: The broker cluster is running with old binary, and a stream application is communicating with it. The stream clients will be using group_coordinator:v1 feature. The leader stream thread would use its admin client to periodically call DescribeFeatures on a random broker in the cluster (presumably every minute), to get information about the latest cluster-wide enabled features. Thus, it would learn about the group_coordinator feature flag, with the finalized version at v1 .
  • T1: The cluster undergoes a rolling upgrade to the new broker binary. Once the upgrade is complete, every broker in the cluster is expected to support v1-v2 versions for the group_coordinator feature flag.
  • T2: The controller still will has enabled only group_coordinator:v1 in '/features' ZK node. So, the above rolling has no effect (yet) on the streams client side.
  • T3: Knowing that the cluster upgrade has completed, the cluster operator (i.e. a human) sends an ApiKeys.UPDATE_FEATURES request to the controller. This is to finalize the cluster-wide group_coordinator feature version level upgrade from v1 to v2. When this request is successful, it means the following:
    → The controller has checked all brokers in the cluster advertised support for group_coordinator:v2, and it has persisted the upgrade to ZK '/features' node.
    → The brokers are gradually receiving ZK notifications about the update to the '/features' ZK node. When each broker refreshes the contents of the ZK node, it will become aware that group_coordinator:v2 has been finalized cluster-wide.
  • T3-4: The streams leader thread, at a certain point in time will see the above change (via the periodic DescribeFeatures call). It will then trigger a rebalance of the streams group plumbing down group_coordinator:v2 feature flag to all members. It is key to note that this event could happen concurrently with T3 that’s why the event is labelled T3-4.
  • T5: Every member of the streams group will be gradually upgrading towards group_coordinator:v2 just as the leader instructed above. This is when the stream client will switch from the old per-task producer model to the new thread level producer model (for KIP-447).

There are some edge cases in this happy upgrade path we shall explain below.

Delay of feature version propagation

Due to eventual consistency in metadata propagation, it is possible that some brokers get notified about group_coordinator:v2 later than others. At T3-4, the streams leader thread first upgrades to group_coordinator:v2 when it learns of a feature version level bump from the broker. Then, after T5 , there exists a case where the leader streams thread could query a stale broker, and learn that the feature support only contains v1. If this situation is not handled properly, we are in a danger of downgrade. This is because in KIP-447 we don’t support downgrade of the processing model, unless user wipes out the old stream instances and restarts from scratch. Following is done to make sure the above scenario never affects the stream use case:

  1. The stream clients shall not downgrade to group_coordinator:v1 once they upgrade to group_coordinator:v2. The solution is to persist the group_coordinator flag as part of the consumers' metadata when the rebalance completes, so that on a second rebalance as long as some members are already on the newer version, the leader of the group will not kick off another feature request because it knows an upgrade was ongoing in last generation, based on the eventual consistency guarantee. It will just continue to inform all the members to upgrade to the newer version.

  2. It is important to note that the user is only upgrading the cluster, from enabling group_coordinator:v1 to enabling group_coordinator:v1-v2. Therefore, once a client sees that v2 is enabled on any broker, it should be a clear signal that all broker binaries within the cluster would support group_coordinator:v2 semantic, it’s just a matter of time before feature version update metadata propagates to every broker from controller.

The above ensures that the upgrade logic is reliable/safe, as there is no case of a downgrade affecting the stream client (during the upgrade path).

Downgrade of group_coordinator feature

As such, we don’t support downgrades in this versioning scheme/system (see Non-goals and Future work). So, there is a danger if the cluster operator tries to downgrade the group_coordinator feature flag from v2 to v1 using the --allow-downgrade flag. This is because the stream client side shall not downgrade at all, and this would crash the EOS applications. A safe practice is to first downgrade the stream clients, before downgrading  the group_coordinator feature flag.

Potential features in Kafka

There could be many features benefitted by the above system, following are a selected few high level examples:

  1. group_coordinator: As described in the above section.

  2. transaction_coordinator: This feature flag could cover changes to the message format for the transaction state internal topic schema, and protocol changes related to transaction coordinator. The advantages are similar to #2.

  3. consumer_offsets_topic_schema: Currently the message format used in the consumer internal topics is tied to IBP. It’s a candidate for a feature, because we will be able to dynamically finalize a new version of the topic schema, without having to roll the brokers twice.

  4. inter_broker_protocol: For transitional purposes, the inter.broker.protocol itself can be made a coarse-grained feature. This could be a way to operationally migrate away avoiding the double roll during IBP bumps.

Compatibility, deprecation and migration plan

Migrating from IBP-based setup to versioning scheme

Here are the various phases of work that would be required to deprecate IBP completely.

Phase #1

Post development, we hit phase #1 where the new versioning system is ready to be used. Here we would like to migrate clusters from IBP-based validations to the new versioning system based setup. In order to achieve this, we shall once again use an IBP double roll. This means, once a cluster has fully migrated to a certain IBP version, we can almost entirely switch to using the new versioning scheme. Let’s say the value for such an IBP version is migration_ibp_version. Then, in order for the versioning system to safely provide a migration path, we do the following:

  • Firstly, the versioning system itself is a new thing. For the initial roll out, we should only operate the versioning system after the second IBP roll → this brings the IBP of the cluster to migration_ibp_version (that’s when versioning system is fully deployed).

  • As a safeguard, each broker will validate that it’s IBP version is at least at migration_ibp_version before applying broker validations for feature versions, and before advertising it's features in ZK.

  • As a safeguard, the controller will validate that it’s IBP version is at least at migration_ibp_version before allowing for feature version upgrades to be finalized in a cluster (via the ApiKeys.UPDATE_FEATURES API).

  • All other decision making logic based on feature versions in the broker code will always validate that the broker’s current IBP version is at least at  migration_ibp_version.

Phase #2

There can be a transitional phase where a Kafka cluster can use both IBP-based setup as well as the new versioning scheme. In order to completely deprecate the existing IBP-based setup, we would want to ensure we no longer have any references to IBP configuration in the broker code base. Once that is done, we can stop using IBP configuration altogether, and deprecate/remove the relevant support in the code.

Phase #3

This is the phase when we no longer use the IBP-based setup and have completely switched to using the new versioning scheme.

Phase #4 (long-term)

Completely deprecate IBP, and remove references to the code.

Migrating from versioning scheme back to IBP (i.e. emergency downgrade)

  • : The cluster undergoes a rolling upgrade to the new broker binary. Once the upgrade is complete, every broker in the cluster is expected to support v1-v2 versions for the group_coordinator feature flag.
  • T2: The controller still will has enabled only group_coordinator:v1 in '/features' ZK node. So, the above rolling has no effect (yet) on the streams client side.
  • T3: Knowing that the cluster upgrade has completed, the cluster operator (i.e. a human) sends an ApiKeys.UPDATE_FEATURES request to the controller. This is to finalize the cluster-wide group_coordinator feature version level upgrade from v1 to v2. When this request is successful, it means the following:
    → The controller has checked all brokers in the cluster advertised support for group_coordinator:v2, and it has persisted the upgrade to ZK '/features' node.
    → The brokers are gradually receiving ZK notifications about the update to the '/features' ZK node. When each broker refreshes the contents of the ZK node, it will become aware that group_coordinator:v2 has been finalized cluster-wide.
  • T3-4: The streams leader thread, at a certain point in time will see the above change (via the periodic DescribeFeatures call). It will then trigger a rebalance of the streams group plumbing down group_coordinator:v2 feature flag to all members. It is key to note that this event could happen concurrently with T3 that’s why the event is labelled T3-4.
  • T5: Every member of the streams group will be gradually upgrading towards group_coordinator:v2 just as the leader instructed above. This is when the stream client will switch from the old per-task producer model to the new thread level producer model (for KIP-447).

There are some edge cases in this happy upgrade path we shall explain below.

Delay of feature version propagation

Due to eventual consistency in metadata propagation, it is possible that some brokers get notified about group_coordinator:v2 later than others. At T3-4, the streams leader thread first upgrades to group_coordinator:v2 when it learns of a feature version level bump from the broker. Then, after T5 , there exists a case where the leader streams thread could query a stale broker, and learn that the feature support only contains v1. If this situation is not handled properly, we are in a danger of downgrade. This is because in KIP-447 we don’t support downgrade of the processing model, unless user wipes out the old stream instances and restarts from scratch. Following is done to make sure the above scenario never affects the stream use case:

  1. The stream clients shall not downgrade to group_coordinator:v1 once they upgrade to group_coordinator:v2. The solution is to persist the group_coordinator flag as part of the consumers' metadata when the rebalance completes, so that on a second rebalance as long as some members are already on the newer version, the leader of the group will not kick off another feature request because it knows an upgrade was ongoing in last generation, based on the eventual consistency guarantee. It will just continue to inform all the members to upgrade to the newer version.

  2. It is important to note that the user is only upgrading the cluster, from enabling group_coordinator:v1 to enabling group_coordinator:v1-v2. Therefore, once a client sees that v2 is enabled on any broker, it should be a clear signal that all broker binaries within the cluster would support group_coordinator:v2 semantic, it’s just a matter of time before feature version update metadata propagates to every broker from controller.

The above ensures that the upgrade logic is reliable/safe, as there is no case of a downgrade affecting the stream client (during the upgrade path).

Downgrade of group_coordinator feature

As such, we don’t support downgrades in this versioning scheme/system (see Non-goals and Future work). So, there is a danger if the cluster operator tries to downgrade the group_coordinator feature flag from v2 to v1 using the --allow-downgrade flag. This is because the stream client side shall not downgrade at all, and this would crash the EOS applications. A safe practice is to first downgrade the stream clients, before downgrading  the group_coordinator feature flag.

Potential features in Kafka

There could be many features benefitted by the above system, following are a selected few high level examples:

  1. group_coordinator: As described in the above section.

  2. transaction_coordinator: This feature flag could cover changes to the message format for the transaction state internal topic schema, and protocol changes related to transaction coordinator. The advantages are similar to #2.

  3. consumer_offsets_topic_schema: Currently the message format used in the consumer internal topics is tied to IBP. It’s a candidate for a feature, because we will be able to dynamically finalize a new version of the topic schema, without having to roll the brokers twice.

  4. inter_broker_protocol: For transitional purposes, the inter.broker.protocol itself can be made a coarse-grained feature flag. This can be a way to operationally migrate away avoiding the double roll during IBP bumps. 

Deployment, IBP deprecation and avoidance of double rolls

There is a configuration key in Kafka called inter.broker.protocol (IBP). Currently, the IBP can only be set by changing the static configuration file supplied to the broker during startup. Here are the various phases of work that would be required to use this KIP to eventually avoid Broker double rolls in the cluster (whenever IBP values are advanced).

Phase #1

Post development of the feature versioning system described in this KIP, we hit phase #1 where the new versioning system is ready to be used:

  • The feature versioning system will be released under a new IBP version: migration_ibp_version.

  • For the initial roll out, we should only operate the versioning system after the second IBP roll → this brings the IBP of the cluster to migration_ibp_version (that’s when versioning system is fully deployed).

    • As a safeguard, each broker will validate that it’s IBP version is at least at migration_ibp_version before applying broker validations for feature versions, and before advertising it's features in ZK.

    • As a safeguard, the controller will validate that it’s IBP version is at least at migration_ibp_version before allowing for feature version changes to be finalized in a cluster.

Phase #2

This is the phase when both the new feature versioning system as well as the existing IBP setup (in the form of static configuration) are active in the Kafka broker. Feature flags may be optionally defined in the code as part of Kafka development. During deployment of newer Kafka releases that use feature flags, there will be a requirement to finalize such feature flags using the provided API/tooling (as required by this KIP). By the end of this phase, we still have not eliminated the requirement for double roll during broker upgrades.


Phase #3

This phase is about realizing the benefit of this KIP to avoid Broker double rolls during upgrades. For this, we would like to move away from using the existing IBP-based setup (in the form of a static configuration) in the broker code base. This requires several steps, as proposed below:

  1. We need a way to map the usage of IBP in the code (in the form of a static configuration) to the usage of IBP in the new feature versioning system. To achieve this, we introduce one or more feature flags in the code. These will be used to release features which were otherwise released under the static IBP config. We illustrate the write-up below using one such feature flag called as ibp-feature. For example, we will use the ibp-feature flag in the code at places wherever newer IBP values (from static configuration) are otherwise needed to be used:
    1. The max version values for this flag will start from 1 and continue increasing for future IBP version advancements.
    2. The min version value for this flag will start from 1, and it is unlikely to be modified (since we rarely or almost never deprecate IBP versions).
  2. By this point, IBP-based decision making in the broker code will be such that:
    1. If the ibp-feature flag is finalized and if static IBP config value is >= migration_ibp_version , then the value of the ibp-feature flag is preferred for decision making over the IBP value from static configuration.
    2. Otherwise if the ibp-feature flag is not finalized yet, we continue to use the latest IBP value based on static configuration for decision making.
  3. We then release the ibp-feature flag as part of a subsequent Kafka release. The release would eventually get deployed to Kafka clusters, and, the ibp-feature flag is expected to be finalized in the cluster (via provided tooling).
  4. Once #3 happens, all future Kafka code changes can continue to use the ibp-feature flag, thereby effectively stopping the use of IBP as a static configuration.

Phase #4

This is the phase when we no longer advance the IBP values in the old IBP-based setup (in the form of static broker configuration) and have completely switched to ibp-feature flag(s) in the Kafka code. The former will be kept around for legacy purposes onlyWe do not foresee this happening once the migration is over, and IBP has been deprecated. However, it can happen that we would want to switch back to using IBP-based setup while we are in Phase #3 above (for example, due to an unprecedented issue). This will be supported, because we deprecate IBP only in Phase #4 above. To "downgrade", a user would first need one rolling restart of the cluster to reduce the IBP to the required version. Then another rolling restart to change the binary. Note that each feature will have its own set of requirements to make this possible (in some cases, it may not be possible), but this is outside the scope of this document.

Future work

As part of future work, we could consider adding better support for downgrades & deprecation. We may consider providing a mechanism through which the cluster operator could learn whether a feature is being actively used by clients at a certain feature version. This information can be useful to decide whether the feature is safe to be deprecated. Note that this could involve collecting data from clients about which feature versions are actively being used, and the scheme for this needs to be thought through.

...