Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Describes supported and finalized features. You may anticipate 
 * certain exceptions when calling get() on the future obtained from the returned
 * DescribeFeaturesResult.
 *
 * @param options   options for the describeFeatures API, 
 * 
 * @return          a result object containing:
 *                    1. List of cluster-wide finalized feature versions.
 *                    2. List of supported feature versions specific to the broker.
 */
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult.
 *
 * @param updates   map of feature updates, keyed by the
 *                  name of the feature
 * @param options   options for the updateFeatures API
 *
 * @return          the results of the FeatureUpdate provided in the request
 */
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

/**
 * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
 *
 * The API of this class is evolving.
 */
@InterfaceStability.Evolving
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {

    //**
 Currently empty, but can *be Setspopulated a flag indicating that in the describefuture features request must be issued only to the controller.
     * - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request must be
     *   issued only to the controller.
     * - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be
     *   issued to any random broker.
     */
    public DescribeFeaturesOptions sendRequestToController(boolean sendRequestToController).

    /**
     * - Returns true if the Admin#describeFeatures(DescribeFeaturesOptions) request must be
     *   issued only to the controller.
     * - Returns false if the Admin#describeFeatures(DescribeFeaturesOptions) request can be
     *   issued to any random broker.
     */
    public boolean sendRequestToController();
}

// ---- END: Proposed Admin API definitions ----as needed.
}

/**
 * Options for {@link AdminClient#describeFeatures(UpdateFeaturesOptions)}.
 *
 * The API of this class is evolving.
 */
@InterfaceStability.Evolving
public class UpdateFeaturesOptions extends AbstractOptions<UpdateFeaturesOptions> {
  // Currently empty, but can be populated in the future as needed.
}

// ---- END: Proposed Admin API definitions ----

// Represents a range of version levels supported by every broker in a cluster for some feature.
class FinalizedVersionRange {
    // The cluster-wide finalized value of the feature min version level (value >= 1).
    short minVersionLevel();

    // The cluster-wide finalized value of the feature max version level (value >=1 and value >= minVersionLevel).
    short maxVersionLevel();
}

// Represents a range of versionversions levelsthat supporteda byparticular every broker in a clustersupports for some feature.
class FinalizedVersionRangeSupportedVersionRange {
    	// The cluster-wide finalized value of the feature min minimum version level (value >= 1).
 of the supported feature.
	short minVersionLevelminVersion();

    	// The cluster-wide finalized value of the feature max maximum version level (value >=1 and value >= minVersionLevelminVersion).
 of the  short maxVersionLevel();
}

// Represents a range of versions that a particular broker supports for some feature.
class SupportedVersionRange {
	// The minimum version (value >= 1) of the supported feature.
	short minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	short supported feature.
	short maxVersion();
}

/**
 * Represents an update to a Feature, which can be sent to the controller
 * for processing.
 */
class FeatureUpdate {
    /**
     * The cluster-wide finalized NEW value of the feature max version level.
     * - When >= 1, it's the new value to-be-updated for the finalized feature.
	 * - When < 1, it indicates the deletion of a finalized feature.
     */
    short maxVersionLevel();
    
    /**
     * Return true only if downgrade/deletion of a feature should be allowed.
     * Note that despite this allowDowngrade flag being set, certain downgrades
     * may be rejected by the controller if it is deemed unsafe to downgrade
     * the max version level of some specific feature.
     */
    bool allowDowngrade();
}

/**
 * Encapsulates details about finalized as well as supported features. This is particularly useful
 * to hold the result returned by the `Admin#describeFeatures(DescribeFeaturesOptions)` API.
 */
class FeatureMetadata {
    /**
     * Returns a map of finalized feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of version levels supported by every broker in the
     * cluster.
     */
	Map<String, FinalizedVersionRange> finalizedFeatures();

    /**
     * The monotonically increasing epoch for the finalized features.
     * If the returned value is empty, it means the finalized features are absent/unavailable.
     */
    Optional<Long> finalizedFeaturesEpoch();

	/**
     * Returns a map of supported feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of versions supported by a particular broker in the
     * cluster.
     */
    Map<String, SupportedVersionRange> supportedFeatures();
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> featureMetadata();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which succeeds only if all the FeatureUpdate in the request succeed.
     */
    KafkaFuture<Void> all();

    /**
     * Returns a map with key being feature name and value being
     * the future which can be used to check the status of the FeatureUpdate
     * in the request.
     *
     * Possible error codes:
     * - NONE: The FeatureUpdate succeeded.
     * - NOT_CONTROLLER: The FeatureUpdate failed since the request was processed by a broker that's not the controller.
     * - CLUSTER_AUTHORIZATION_FAILED: The FeatureUpdate failed since there wasn't sufficient permission to perform the update.
     * - INVALID_REQUEST: The FeatureUpdate failed because it is invalid.
     * - FEATURE_UPDATE_FAILED: The FeatureUpdate failed because it can not be applied (ex: due to version incompatibilities)
     */
    Map<String, KafkaFuture<Void>> values();
}

...

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.

$> kafka-features.sh
#  - Optionally, provide `--controller`describe flag\
 directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.

$> kafka-features.sh describe \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 1
        },
        "transaction_coordinator": {
			"min_version_level": 1,
        	"max_version_level": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== UPGRADE TO ALL LATEST --bootstrap-server kafka-broker0.prn1:9071 \

Feature: consumer_offsets_topic_schema  SupportedMinVersion: 1  SupportedMaxVersion: 1  FinalizedMinVersionLevel: -     FinalizedMaxVersionLevel: -     Epoch: 1
Feature: group_coordinator  SupportedMinVersion: 1  SupportedMaxVersion: 2  FinalizedMinVersionLevel: 1     FinalizedMaxVersionLevel: 1     Epoch: 1
Feature: transaction_coordinator  SupportedMinVersion: 1  SupportedMaxVersion: 5  FinalizedMinVersionLevel: 1     FinalizedMaxVersionLevel: 4     Epoch: 1

=== UPGRADE TO ALL LATEST FEATURES ===

# Upgrade to the max version levels of all features, as internally known to the CLI tool.
#
# - This command removes the burden to individually finalize feature upgrades.
#   This becomes handy to a cluster operator intending to finalize a cluster with all the latest
#   available feature version levels. This usually happens after completing the deployment
#   of a newer Kafka Broker release onto an existing cluster.
# - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh \
     --upgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--dry-run]

    [Add] Feature: consumer_offsets_topic_schema       ExistingFinalizedMaxVersion: -  NewFinalizedMaxVersion: 1       Result: OK
[Upgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion: 1  NewFinalizedMaxVersion: 2       Result: OK
[Upgrade] Feature: transaction_coordinator      ExistingFinalizedMaxVersion: 4  NewFinalizedMaxVersion: 5       Result: OK

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# UpgradeDowngrade to the max version levels of all features, as internally known to the CLI tool.
#
# - This command removes the burden to individually finalize feature upgrades.version
#   downgrades. This becomes handy to a cluster operator intending to finalize a cluster with all the latestdowngrade all
#  available feature version levels. This usually happens after completing the, just prior to rolling back a Kafka Broker deployment
# of a neweron Kafkaa Brokercluster, releaseto ontoa anprevious existingBroker clusterrelease.

$> kafka-features.sh finalize-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh \
     --downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071 \
     "max_version": 3[--dry-run]

   [Delete] Feature: consumer_offsets_topic_schema   },
    ExistingFinalizedMaxVersion: 1   "transaction_coordinator"NewFinalizedMaxVersion: {
-        	"min_version": 1,
Result: OK
[Downgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion:  	"max_version"2  NewFinalizedMaxVersion: 6
1       Result: },OK
[Downgrade] Feature: transaction_coordinator      "consumer_offsets_topic_schema"ExistingFinalizedMaxVersion: {5 
 NewFinalizedMaxVersion: 4          "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# Downgrade to the max version levels of all features known to the CLI tool.
#
# This command removes the burden to individually finalize feature version
# downgrades. This becomes handy to a cluster operator intending to downgrade all
# feature version levels, just prior to rolling back a Kafka Broker deployment
# on a cluster, to a previous Broker release.

$> kafka-features.sh downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 3
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 6
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

Advanced CLI tool usage

...

Result: OK

Advanced CLI tool usage

Following are examples of advanced usage of the CLI tool. Going beyond regular usage, advanced usage involves adding/upgrading/downgrading/deleting specific cluster-wide finalized feature versions.

Code Block
=== ADD OR UPGRADE OR DOWNGRADE OR DELETE FEATURES ===

# Add or update a list of cluster-wide finalized features.
# - Use `--bootstrap-server` to provide a broker host:port to which the queries should be issued.
# - Optionally, use `--upgrade` to provide a comma-separated list of features and new finalized max version to add or upgrade.
# - Optionally, use `--downgrade` to provide a comma-separated list of features and new finalized max version to downgrade to. This should be used only when required.
# - Optionally, use `--delete` to provide a comma-separated list of finalized features to be deleted.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --downgrade transaction_coordinator:3 \
     --delete replication_throttling \
     [--dry-run]

      [Add] Feature: consumer_offsets_topic_schema       ExistingFinalizedMaxVersion: -  NewFinalizedMaxVersion: 1       Result: OK
  [Upgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion: 1  NewFinalizedMaxVersion: 2       Result: OK
[Downgrade] Feature: transaction_coordinator      ExistingFinalizedMaxVersion: 4  NewFinalizedMaxVersion: 3       Result: OK
   [Delete] Feature: replication_throttling       ExistingFinalizedMaxVersion: 2  NewFinalizedMaxVersion: -       Result: OK
Code Block
=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--allow-downgrade` to allow a downgrade for feature version levels. This should be used only when required.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --allow-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 2
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of finalized features to be deleted.

$> kafka-features.sh delete \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": {
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

New or changed public interfaces

...

  1. We need a way to map the usage of IBP in the code (in the form of a static configuration) to the usage of IBP in the new feature versioning system. To achieve this, we introduce one or more feature flags in the code. These will be used to release features which were otherwise released under the static IBP config. We illustrate the write-up below using one such a feature flag called as ibp-feature. For example, we will use the ibp-feature flag in the code at places wherever newer IBP values (from static configuration) are otherwise needed to be used:
    1. The max version values for this flag will start from 1 and continue increasing for future IBP version advancements.
    2. The min version value for this flag will start from 1, and it is unlikely to be modified (since we rarely or almost never deprecate IBP versions).
  2. By this point, IBP-based decision making in the broker code will be such that:
    1. If the ibp-feature flag is finalized and if static IBP config value is >= migration_ibp_version , then the value of the ibp-feature flag is preferred for decision making over the IBP value from static configuration.
    2. Otherwise if the ibp-feature flag is not finalized yet, we continue to use the latest IBP value based on static configuration for decision making.
  3. We then release the ibp-feature flag as part of a subsequent Kafka release. The release would eventually get deployed to Kafka clusters, and, the ibp-feature flag is expected to be finalized in the cluster (via provided tooling).
  4. Once #3 happens, all future Kafka code changes can continue to use the ibp-feature flag, thereby effectively stopping the use of IBP as a static configuration.

...