Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Deprecating a feature version is an incompatible change, which requires a major release of Kafka (see Non-goals section). This requirement translates to increasing the cluster-wide finalized minimum version level of one or more features in the ZK '/features' node. It is important to note that the minimum version level can not be mutated via the Controller API. This is because, the minimum version level is usually increased only to indicate the intent to stop support for a certain feature version. We would usually deprecate features during broker releases, after prior announcements. Therefore, this is not a dynamic operation, and such a mutation is not supported through the ApiKeys.UPDATE_FEATURES controller API.

Instead, it is best if the version deprecation is activated through the controller. This is the proposed approach to achieve itthe same:
Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at '/brokers/ids/<id>'). The contents of the broker nodes are cached in memory in the Controller. Using these contents, the controller should be able to track the highest supported feature minimum version exported by any broker in the cluster. Then, whenever a Kafka release advances the supported feature minimum version (to deprecate feature version), the controller picks up this change and persists the highest supported minimum version to ZK as the finalized minimum version level for the feature. This change by the controller will deprecate the finalized feature versionsunto 1 below the highest supported minimum version.

It is important that an admin should make sure that no clients are using a deprecated feature version (e.g. using the client version metric) before deploying a release that deprecates it.


Client discovery of finalized feature versions

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Describes supported and finalized features. You may anticipate 
 * certain exceptions when calling get() on the future obtained from the returned
 * DescribeFeaturesResult.
 *
 * @param options   options for the describeFeatures API, 
 * 
 * @return          a result object containing:
 *                    1. List of cluster-wide finalized feature versions.
 *                    2. List of supported feature versions specific to the broker.
 */
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult.
 *
 * @param updates   map of feature updates, keyed by the
 *                  name of the feature
 * @param options   options for the updateFeatures API
 *
 * @return          the results of the FeatureUpdate provided in the request
 */
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

/**
 * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
 *
 * The API of this class is evolving.
 */
@InterfaceStability.Evolving
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {

    //**
 Currently empty, but can *be Setspopulated a flag indicating that in the describefuture features request must be issued only to the controller.
     * - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request must be
     *   issued only to the controller.
     * - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be
     *   issued to any random broker.
     */
    public DescribeFeaturesOptions sendRequestToController(boolean sendRequestToController).

    /**
     * - Returns true if the Admin#describeFeatures(DescribeFeaturesOptions) request must be
     *   issued only to the controller.
     * - Returns false if the Admin#describeFeatures(DescribeFeaturesOptions) request can be
     *   issued to any random broker.
     */
    public boolean sendRequestToControlleras needed.
}

/**
 * Options for {@link AdminClient#describeFeatures(UpdateFeaturesOptions)}.
 *
 * The API of this class is evolving.
 */
@InterfaceStability.Evolving
public class UpdateFeaturesOptions extends AbstractOptions<UpdateFeaturesOptions> {
  // Currently empty, but can be populated in the future as needed.
}

// ---- END: Proposed Admin API definitions ----

// Represents a range of version levels supported by every broker in a cluster for some feature.
class FinalizedVersionRange {
    // The cluster-wide finalized value of the feature min version level (value >= 1).
    short minVersionLevel();

    // The cluster-wide finalized value of the feature max version level (value >=1 and value >= minVersionLevel).
    short maxVersionLevel();
}

// ---- END: Proposed Admin API definitions ----

// Represents a range of versionversions levelsthat supporteda by everyparticular broker insupports a cluster for some feature.
class FinalizedVersionRangeSupportedVersionRange {
    	// The cluster-wide finalized value of the feature min minimum version level (value >= 1).
 of the supported feature.
	short minVersionLevelminVersion();

    	// The cluster-wide finalized value of the feature max maximum version level (value >=1 and value >= minVersionLevelminVersion).
 of the supported feature.
	short maxVersionLevelmaxVersion();
}

//**
 * Represents an update to a range of versions that a particular broker supports for some feature.
class SupportedVersionRange {
	// The minimum version (value >= 1) of the supported feature.
	short minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	short maxVersion();
}

/**
 * Represents an update to a Feature, which can be sent to the controller
 * for processing.
 */
class FeatureUpdate {Feature, which can be sent to the controller
 * for processing.
 */
class FeatureUpdate {
    /**
     * The cluster-wide finalized NEW value of the feature max version level.
     * - When >= 1, it's the new value to-be-updated for the finalized feature.
	 * - When < 1, it indicates the deletion of a finalized feature.
     */
    short maxVersionLevel();
    
    /**
     * TheReturn cluster-widetrue finalizedonly NEWif valuedowngrade/deletion of thea feature maxshould versionbe levelallowed.
     * - When >= 1, it's the new value to-be-updated for the finalized feature.
	 * - When < 1, it indicates the deletion of a finalized feature.
     */
    short maxVersionLevel();
    
    /**
     * Return true only if downgrade/deletion of a feature should be allowed.
     * Note that Note that despite this allowDowngrade flag being set, certain downgrades
     * may be rejected by the controller if it is deemed unsafe to downgrade
     * the max version level of some specific feature.
     */
    bool allowDowngrade();
}

/**
 * Encapsulates details about finalized as well as supported features. This is particularly useful
 * to hold the result returned by the `Admin#describeFeatures(DescribeFeaturesOptions)` API.
 */
class FeatureMetadata {
    /**
     * Returns a map of finalized feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of version levels supported by every broker in the
     * cluster.
     */
	Map<String, FinalizedVersionRange> finalizedFeatures();

    /**
     * The monotonically increasing epoch for the finalized features.
     * If the returned value is empty, it means the finalized features are absent/unavailable.
     */
    Optional<Long> finalizedFeaturesEpoch();

	/**
     * Returns a map of supported feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of versions supported by a particular broker in the
     * cluster.
     */
    Map<String, SupportedVersionRange> supportedFeatures();
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> featureMetadata();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which succeeds only if all the FeatureUpdate in the request succeed.
     */
    KafkaFuture<Void> all();

    /**
     * Returns a map with key being feature name and value being
     * the future which can be used to check the status of the FeatureUpdate
     * in the request.
     *
     * Possible error codes:
     * - NONE: The FeatureUpdate succeeded.
     * - NOT_CONTROLLER: The FeatureUpdate failed since the request was processed by a broker that's not the controller.
     * - CLUSTER_AUTHORIZATION_FAILED: The FeatureUpdate failed since there wasn't sufficient permission to perform the update.
     * - INVALID_REQUEST: The FeatureUpdate failed because it is invalid.
     * - FEATURE_UPDATE_FAILED: The FeatureUpdate failed because it can not be applied (ex: due to version incompatibilities)
     */
    Map<String, KafkaFuture<Void>> values();
}

...

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#
$> kafka-features.sh
  - Optionally, provide `--controller`describe flag\
 directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.

$> kafka-features.sh describe \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 1
        },
        "transaction_coordinator": {
			"min_version_level": 1,
        	"max_version_level": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== UPGRADE TO ALL LATEST--bootstrap-server kafka-broker0.prn1:9071 \

Feature: consumer_offsets_topic_schema  SupportedMinVersion: 1  SupportedMaxVersion: 1  FinalizedMinVersionLevel: -     FinalizedMaxVersionLevel: -     Epoch: 1
Feature: group_coordinator  SupportedMinVersion: 1  SupportedMaxVersion: 2  FinalizedMinVersionLevel: 1     FinalizedMaxVersionLevel: 1     Epoch: 1
Feature: transaction_coordinator  SupportedMinVersion: 1  SupportedMaxVersion: 5  FinalizedMinVersionLevel: 1     FinalizedMaxVersionLevel: 4     Epoch: 1

=== UPGRADE TO ALL LATEST FEATURES ===

# Upgrade to the max version levels of all features, as internally known to the CLI tool.
#
# - This command removes the burden to individually finalize feature upgrades.
#   This becomes handy to a cluster operator intending to finalize a cluster with all the latest
#   available feature version levels. This usually happens after completing the deployment
#   of a newer Kafka Broker release onto an existing cluster.
# - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh \
     --upgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--dry-run]

    [Add] Feature: consumer_offsets_topic_schema       ExistingFinalizedMaxVersion: -  NewFinalizedMaxVersion: 1       Result: OK
[Upgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion: 1  NewFinalizedMaxVersion: 2       Result: OK
[Upgrade] Feature: transaction_coordinator      ExistingFinalizedMaxVersion: 4  NewFinalizedMaxVersion: 5       Result: OK

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# UpgradeDowngrade to the max version levels of all features, as internally known to the CLI tool.
#
# - This command removes the burden to individually finalize feature upgrades.version
#   downgrades. This becomes handy to a cluster operator intending to finalize a cluster with all the latestdowngrade all
#  available feature version levels. This usually happens after completing the, just prior to rolling back a Kafka Broker deployment
# of a neweron Kafkaa Brokercluster, releaseto ontoa anprevious existingBroker cluster.
release.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh finalize\
     --downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": { \
     [--dry-run]

   [Delete] Feature: consumer_offsets_topic_schema       "min_version"ExistingFinalizedMaxVersion: 1,
  NewFinalizedMaxVersion: -       Result:  "max_version": 3
OK
[Downgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion: 2 },
 NewFinalizedMaxVersion: 1      "transaction_coordinator" Result: {OK
[Downgrade] Feature: transaction_coordinator      	"min_version"ExistingFinalizedMaxVersion: 1,
5        	"max_version"NewFinalizedMaxVersion: 6
4        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# Downgrade to the max version levels of all features known to the CLI tool.
#
# This command removes the burden to individually finalize feature version
# downgrades. This becomes handy to a cluster operator intending to downgrade all
# feature version levels, just prior to rolling back a Kafka Broker deployment
# on a cluster, to a previous Broker release.

$> kafka-features.sh downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 3
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 6
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

Advanced CLI tool usage

...

Result: OK

Advanced CLI tool usage

Following are examples of advanced usage of the CLI tool. Going beyond regular usage, advanced usage involves adding/upgrading/downgrading/deleting specific cluster-wide finalized feature versions.

Code Block
=== ADD OR UPGRADE OR DOWNGRADE OR DELETE FEATURES ===

# Add or update a list of cluster-wide finalized features.
# - Use `--bootstrap-server` to provide a broker host:port to which the queries should be issued.
# - Optionally, use `--upgrade` to provide a comma-separated list of features and new finalized max version to add or upgrade.
# - Optionally, use `--downgrade` to provide a comma-separated list of features and new finalized max version to downgrade to. This should be used only when required.
# - Optionally, use `--delete` to provide a comma-separated list of finalized features to be deleted.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --downgrade transaction_coordinator:3 \
     --delete replication_throttling \
     [--dry-run]

      [Add] Feature: consumer_offsets_topic_schema       ExistingFinalizedMaxVersion: -  NewFinalizedMaxVersion: 1       Result: OK
  [Upgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion: 1  NewFinalizedMaxVersion: 2       Result: OK
[Downgrade] Feature: transaction_coordinator      ExistingFinalizedMaxVersion: 4  NewFinalizedMaxVersion: 3       Result: OK
   [Delete] Feature: replication_throttling       ExistingFinalizedMaxVersion: 2  NewFinalizedMaxVersion: -       Result: OK
Code Block
=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--allow-downgrade` to allow a downgrade for feature version levels. This should be used only when required.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --allow-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 2
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of finalized features to be deleted.

$> kafka-features.sh delete \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": {
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

New or changed public interfaces

...

  1. We need a way to map the usage of IBP in the code (in the form of a static configuration) to the usage of IBP in the new feature versioning system. To achieve this, we introduce one or more feature flags in the code. These will be used to release features which were otherwise released under the static IBP config. We illustrate the write-up below using one such a feature flag called as ibp-feature. For example, we will use the ibp-feature flag in the code at places wherever newer IBP values (from static configuration) are otherwise needed to be used:
    1. The max version values for this flag will start from 1 and continue increasing for future IBP version advancements.
    2. The min version value for this flag will start from 1, and it is unlikely to be modified (since we rarely or almost never deprecate IBP versions).
  2. By this point, IBP-based decision making in the broker code will be such that:
    1. If the ibp-feature flag is finalized and if static IBP config value is >= migration_ibp_version , then the value of the ibp-feature flag is preferred for decision making over the IBP value from static configuration.
    2. Otherwise if the ibp-feature flag is not finalized yet, we continue to use the latest IBP value based on static configuration for decision making.
  3. We then release the ibp-feature flag as part of a subsequent Kafka release. The release would eventually get deployed to Kafka clusters, and, the ibp-feature flag is expected to be finalized in the cluster (via provided tooling).
  4. Once #3 happens, all future Kafka code changes can continue to use the ibp-feature flag, thereby effectively stopping the use of IBP as a static configuration.

...