Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Deprecating a feature version is an incompatible change, which requires a major release of Kafka (see Non-goals section). This requirement translates to increasing the cluster-wide finalized minimum version level of one or more features in the ZK '/features' node. It is important to note that the minimum version level can not be mutated via the Controller API. This is because, the minimum version level is usually increased only to indicate the intent to stop support for a certain feature version. We would usually deprecate features during broker releases, after prior announcements. Therefore, this is not a dynamic operation, and such a mutation is not supported through the ApiKeys.UPDATE_FEATURES controller API.

Instead, it is best if the version deprecation is activated through the controller. This is the proposed approach to achieve itthe same:
Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at '/brokers/ids/<id>'). The contents of the broker nodes are cached in memory in the Controller. Using these contents, the controller should be able to track the highest supported feature minimum version exported by any broker in the cluster. Then, whenever a Kafka release advances the supported feature minimum version (to deprecate feature version), the controller picks up this change and persists the highest supported minimum version to ZK as the finalized minimum version level for the feature. This change by the controller will deprecate the finalized feature versionsunto 1 below the highest supported minimum version.

It is important that an admin should make sure that no clients are using a deprecated feature version (e.g. using the client version metric) before deploying a release that deprecates it.


Client discovery of finalized feature versions

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Describes supported and finalized features. You may anticipate 
 * certain exceptions when calling get() on the future obtained from the returned
 * DescribeFeaturesResult.
 *
 * @param options   options for the describeFeatures API, 
 * 
 * @return          a result object containing:
 *                    1. List of cluster-wide finalized feature versions.
 *                    2. List of supported feature versions specific to the broker.
 */
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult.
 *
 * @param updates   map of feature updates, keyed by the
 *                  name of the feature
 * @param options   options for the updateFeatures API
 *
 * @return          the results of the FeatureUpdate provided in the request
 */
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

/**
 * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
 *
 * The API of this class is evolving.
 */
@InterfaceStability.Evolving
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {

    //**
 Currently empty, but can *be Setspopulated a flag indicating that in the describefuture features request must be issued only to the controller.
     * - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request must be
     *   issued only to the controller.
     * - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be
     *   issued to any random broker.
     */
    public DescribeFeaturesOptions sendRequestToController(boolean sendRequestToController).

    /**
     * - Returns true if the Admin#describeFeatures(DescribeFeaturesOptions) request must be
     *   issued only to the controller.
     * - Returns false if the Admin#describeFeatures(DescribeFeaturesOptions) request can be
     *   issued to any random broker.
     */
    public boolean sendRequestToControlleras needed.
}

/**
 * Options for {@link AdminClient#describeFeatures(UpdateFeaturesOptions)}.
 *
 * The API of this class is evolving.
 */
@InterfaceStability.Evolving
public class UpdateFeaturesOptions extends AbstractOptions<UpdateFeaturesOptions> {
  // Currently empty, but can be populated in the future as needed.
}

// ---- END: Proposed Admin API definitions ----

// Represents a range of version levels supported by every broker in a cluster for some feature.
class FinalizedVersionRange {
    // The cluster-wide finalized value of the feature min version level (value >= 1).
    short minVersionLevel();

    // The cluster-wide finalized value of the feature max version level (value >=1 and value >= minVersionLevel).
    short maxVersionLevel();
}

// ---- END: Proposed Admin API definitions ----

// Represents a range of versionversions levelsthat supporteda by everyparticular broker insupports a cluster for some feature.
class FinalizedVersionRangeSupportedVersionRange {
    	// The cluster-wide finalized value of the feature min minimum version level (value >= 1).
 of the supported feature.
	short minVersionLevelminVersion();

    	// The cluster-wide finalized value of the feature max maximum version level (value >=1 and value >= minVersionLevelminVersion).
 of the supported feature.
	short maxVersionLevelmaxVersion();
}

//**
 * Represents an update to a range of versions that a particular broker supports for some feature.
class SupportedVersionRange {
	// The minimum version (value >= 1) of the supported feature.
	short minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	short maxVersion();
}

/**
 * Represents an update to a Feature, which can be sent to the controller
 * for processing.
 */
class FeatureUpdate {Feature, which can be sent to the controller
 * for processing.
 */
class FeatureUpdate {
    /**
     * The cluster-wide finalized NEW value of the feature max version level.
     * - When >= 1, it's the new value to-be-updated for the finalized feature.
	 * - When < 1, it indicates the deletion of a finalized feature.
     */
    short maxVersionLevel();
    
    /**
     * TheReturn cluster-widetrue finalizedonly NEWif valuedowngrade/deletion of thea feature maxshould versionbe levelallowed.
     * - When >= 1, it's the new value to-be-updated for the finalized feature.
	 * - When < 1, it indicates the deletion of a finalized feature.
     */
    short maxVersionLevel();
    
    /**
     * Return true only if downgrade/deletion of a feature should be allowed.
     * Note that Note that despite this allowDowngrade flag being set, certain downgrades
     * may be rejected by the controller if it is deemed unsafe to downgrade
     * the max version level of some specific feature.
     */
    bool allowDowngrade();
}

/**
 * Encapsulates details about finalized as well as supported features. This is particularly useful
 * to hold the result returned by the `Admin#describeFeatures(DescribeFeaturesOptions)` API.
 */
class FeatureMetadata {
    /**
     * Returns a map of finalized feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of version levels supported by every broker in the
     * cluster.
     */
	Map<String, FinalizedVersionRange> finalizedFeatures();

    /**
     * The monotonically increasing epoch for the finalized features.
     * If the returned value is empty, it means the finalized features are absent/unavailable.
     */
    Optional<Long> finalizedFeaturesEpoch();

	/**
     * Returns a map of supported feature versions. Each entry in the map contains a key being a
     * feature name and the value being a range of versions supported by a particular broker in the
     * cluster.
     */
    Map<String, SupportedVersionRange> supportedFeatures();
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> featureMetadata();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which succeeds only if all the FeatureUpdate in the request succeed.
     */
    KafkaFuture<Void> all();

    /**
     * Returns a map with key being feature name and value being
     * the future which can be used to check the status of the FeatureUpdate
     * in the request.
     *
     * Possible error codes:
     * - NONE: The FeatureUpdate succeeded.
     * - NOT_CONTROLLER: The FeatureUpdate failed since the request was processed by a broker that's not the controller.
     * - CLUSTER_AUTHORIZATION_FAILED: The FeatureUpdate failed since there wasn't sufficient permission to perform the update.
     * - INVALID_REQUEST: The FeatureUpdate failed because it is invalid.
     * - FEATURE_UPDATE_FAILED: The FeatureUpdate failed because it can not be applied (ex: due to version incompatibilities)
     */
    Map<String, KafkaFuture<Void>> values();
}

...

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#
$> kafka-features.sh
  - Optionally, provide `--controller`describe flag\
 directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.

$> kafka-features.sh describe \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 1
        },
        "transaction_coordinator": {
			"min_version_level": 1,
        	"max_version_level": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== UPGRADE TO ALL LATEST--bootstrap-server kafka-broker0.prn1:9071 \

Feature: consumer_offsets_topic_schema  SupportedMinVersion: 1  SupportedMaxVersion: 1  FinalizedMinVersionLevel: -     FinalizedMaxVersionLevel: -     Epoch: 1
Feature: group_coordinator  SupportedMinVersion: 1  SupportedMaxVersion: 2  FinalizedMinVersionLevel: 1     FinalizedMaxVersionLevel: 1     Epoch: 1
Feature: transaction_coordinator  SupportedMinVersion: 1  SupportedMaxVersion: 5  FinalizedMinVersionLevel: 1     FinalizedMaxVersionLevel: 4     Epoch: 1

=== UPGRADE TO ALL LATEST FEATURES ===

# Upgrade to the max version levels of all features, as internally known to the CLI tool.
#
# - This command removes the burden to individually finalize feature upgrades.
#   This becomes handy to a cluster operator intending to finalize a cluster with all the latest
#   available feature version levels. This usually happens after completing the deployment
#   of a newer Kafka Broker release onto an existing cluster.
# - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh \
     --upgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071 \
     [--dry-run]

    [Add] Feature: consumer_offsets_topic_schema       ExistingFinalizedMaxVersion: -  NewFinalizedMaxVersion: 1       Result: OK
[Upgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion: 1  NewFinalizedMaxVersion: 2       Result: OK
[Upgrade] Feature: transaction_coordinator      ExistingFinalizedMaxVersion: 4  NewFinalizedMaxVersion: 5       Result: OK

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# UpgradeDowngrade to the max version levels of all features, as internally known to the CLI tool.
#
# - This command removes the burden to individually finalize feature upgrades.version
#   downgrades. This becomes handy to a cluster operator intending to finalize a cluster with all the latestdowngrade all
#  available feature version levels. This usually happens after completing the, just prior to rolling back a Kafka Broker deployment
# of a neweron Kafkaa Brokercluster, releaseto ontoa anprevious existingBroker cluster.
release.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh finalize\
     --downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": { \
     [--dry-run]

   [Delete] Feature: consumer_offsets_topic_schema       "min_version"ExistingFinalizedMaxVersion: 1,
  NewFinalizedMaxVersion: -       Result:  "max_version": 3
OK
[Downgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion: 2 },
 NewFinalizedMaxVersion: 1      "transaction_coordinator" Result: {OK
[Downgrade] Feature: transaction_coordinator      	"min_version"ExistingFinalizedMaxVersion: 1,
5        	"max_version"NewFinalizedMaxVersion: 6
4        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== EMERGENCY DOWNGRADE ALL FEATURES ===

# Downgrade to the max version levels of all features known to the CLI tool.
#
# This command removes the burden to individually finalize feature version
# downgrades. This becomes handy to a cluster operator intending to downgrade all
# feature version levels, just prior to rolling back a Kafka Broker deployment
# on a cluster, to a previous Broker release.

$> kafka-features.sh downgrade-all \
     --bootstrap-server kafka-broker0.prn1:9071

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 3
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 6
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 3
        }
	},
	"finalized_features": {
		"epoch": 3,
		"group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 6
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 3
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

Advanced CLI tool usage

...

Result: OK

Advanced CLI tool usage

Following are examples of advanced usage of the CLI tool. Going beyond regular usage, advanced usage involves adding/upgrading/downgrading/deleting specific cluster-wide finalized feature versions.

Code Block
=== ADD OR UPGRADE OR DOWNGRADE OR DELETE FEATURES ===

# Add or update a list of cluster-wide finalized features.
# - Use `--bootstrap-server` to provide a broker host:port to which the queries should be issued.
# - Optionally, use `--upgrade` to provide a comma-separated list of features and new finalized max version to add or upgrade.
# - Optionally, use `--downgrade` to provide a comma-separated list of features and new finalized max version to downgrade to. This should be used only when required.
# - Optionally, use `--delete` to provide a comma-separated list of finalized features to be deleted.
# - Optionally, use the `--dry-run` flag to list the feature updates without applying them.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --downgrade transaction_coordinator:3 \
     --delete replication_throttling \
     [--dry-run]

      [Add] Feature: consumer_offsets_topic_schema       ExistingFinalizedMaxVersion: -  NewFinalizedMaxVersion: 1       Result: OK
  [Upgrade] Feature: group_coordinator      ExistingFinalizedMaxVersion: 1  NewFinalizedMaxVersion: 2       Result: OK
[Downgrade] Feature: transaction_coordinator      ExistingFinalizedMaxVersion: 4  NewFinalizedMaxVersion: 3       Result: OK
   [Delete] Feature: replication_throttling       ExistingFinalizedMaxVersion: 2  NewFinalizedMaxVersion: -       Result: OK

New or changed public interfaces

Summary of changes:

  1. We introduce 1 new API in the broker RPC interface (see this section). This is the new ApiKeys.UPDATE_FEATURES (
Code Block
=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--allow-downgrade` to allow a downgrade for feature version levels. This should be used only when required.

$> kafka-features.sh update \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --upgrade group_coordinator:2,consumer_offsets_topic_schema:1 \
     --allow-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
			"min_version_level": 1,
            "max_version_level": 2
        },
        "transaction_coordinator": {
			"min_version_level": 1,
            "max_version_level": 3
        },
        "consumer_offsets_topic_schema": { 
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of finalized features to be deleted.

$> kafka-features.sh delete \
     --bootstrap-server kafka-broker0.prn1:9071 \
     --features group_coordinator,transaction_coordinator

Please confirm deletion of the following finalized features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "min_version": 1,
            "max_version": 2
        },
        "transaction_coordinator": {
        	"min_version": 1,
        	"max_version": 5
        },
        "consumer_offsets_topic_schema": { 
            "min_version": 1,
        	"max_version": 1
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": {
			"min_version_level": 1,
            "max_version_level": 1
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

New or changed public interfaces

Summary of changes:

  1. We introduce 1 new API in the broker RPC interface (see this section). This is the new ApiKeys.UPDATE_FEATURES (schema) that can only be served successfully by the controller.
  2. We introduce few optional fields in the ApiVersionsResponse containing the cluster-wide finalized feature metadata, feature metadata epoch, and the broker's supported features (see this section).
  3. We introduce 2 new APIs in the Admin interface to describe/addOrUpdate/delete features (see this section). Underneath covers, these exercise the APIs mentioned above.

...

Kafka Streams client uses the consumer rebalance protocol to propagate the group metadata information. There would be one nominated leader among the group. The sequence of feature upgrade events in the happy path shall be the following (T0, T1, …, Tn refers to increasing time):

  • T0: The broker cluster is running with old binary, and a stream application is communicating with it. The stream clients will be using group_coordinator:v1 feature. The leader stream thread would use its admin client to periodically call DescribeFeatures on a random broker in the cluster (presumably every minute), to get information about the latest cluster-wide enabled features. Thus, it would learn about the group_coordinator feature flag, with the finalized version at v1 .
  • T1: The cluster undergoes a rolling upgrade to the new broker binary. Once the upgrade is complete, every broker in the cluster is expected to support v1-v2 versions for the group_coordinator feature flag.
  • T2: The controller still will has enabled only group_coordinator:v1 in '/features' ZK node. So, the above rolling has no effect (yet) on the streams client side.
  • T3: Knowing that the cluster upgrade has completed, the cluster operator (i.e. a human) sends an ApiKeys.UPDATE_FEATURES request to the controller. This is to finalize the cluster-wide group_coordinator feature version level upgrade from v1 to v2. When this request is successful, it means the following:
    → The controller has checked all brokers in the cluster advertised support for group_coordinator:v2, and it has persisted the upgrade to ZK '/features' node.
    → The brokers are gradually receiving ZK notifications about the update to the '/features' ZK node. When each broker refreshes the contents of the ZK node, it will become aware that group_coordinator:v2 has been finalized cluster-wide.
  • T3-4: The streams leader thread, at a certain point in time will see the above change (via the periodic DescribeFeatures call). It will then trigger a rebalance of the streams group plumbing down group_coordinator:v2 feature flag to all members. It is key to note that this event could happen concurrently with T3 that’s why the event is labelled T3-4.
  • T5: Every member of the streams group will be gradually upgrading towards group_coordinator:v2 just as the leader instructed above. This is when the stream client will switch from the old per-task producer model to the new thread level producer model (for KIP-447).

There are some edge cases in this happy upgrade path we shall explain below.

Delay of feature version propagation

Due to eventual consistency in metadata propagation, it is possible that some brokers get notified about group_coordinator:v2 later than others. At T3-4, the streams leader thread first upgrades to group_coordinator:v2 when it learns of a feature version level bump from the broker. Then, after T5 , there exists a case where the leader streams thread could query a stale broker, and learn that the feature support only contains v1. If this situation is not handled properly, we are in a danger of downgrade. This is because in KIP-447 we don’t support downgrade of the processing model, unless user wipes out the old stream instances and restarts from scratch. Following is done to make sure the above scenario never affects the stream use case:

  1. The stream clients shall not downgrade to group_coordinator:v1 once they upgrade to group_coordinator:v2. The solution is to persist the group_coordinator flag as part of the consumers' metadata when the rebalance completes, so that on a second rebalance as long as some members are already on the newer version, the leader of the group will not kick off another feature request because it knows an upgrade was ongoing in last generation, based on the eventual consistency guarantee. It will just continue to inform all the members to upgrade to the newer version.

  2. It is important to note that the user is only upgrading the cluster, from enabling group_coordinator:v1 to enabling group_coordinator:v1-v2. Therefore, once a client sees that v2 is enabled on any broker, it should be a clear signal that all broker binaries within the cluster would support group_coordinator:v2 semantic, it’s just a matter of time before feature version update metadata propagates to every broker from controller.

The above ensures that the upgrade logic is reliable/safe, as there is no case of a downgrade affecting the stream client (during the upgrade path).

Downgrade of group_coordinator feature

As such, we don’t support downgrades in this versioning scheme/system (see Non-goals and Future work). So, there is a danger if the cluster operator tries to downgrade the group_coordinator feature flag from v2 to v1 using the --allow-downgrade flag. This is because the stream client side shall not downgrade at all, and this would crash the EOS applications. A safe practice is to first downgrade the stream clients, before downgrading  the group_coordinator feature flag.

Potential features in Kafka

There could be many features benefitted by the above system, following are a selected few high level examples:

  1. group_coordinator: As described in the above section.

  2. transaction_coordinator: This feature flag could cover changes to the message format for the transaction state internal topic schema, and protocol changes related to transaction coordinator. The advantages are similar to #2.

  3. consumer_offsets_topic_schema: Currently the message format used in the consumer internal topics is tied to IBP. It’s a candidate for a feature, because we will be able to dynamically finalize a new version of the topic schema, without having to roll the brokers twice.

  4. inter_broker_protocol: For transitional purposes, the inter.broker.protocol itself can be made a coarse-grained feature. This could be a way to operationally migrate away avoiding the double roll during IBP bumps.

Compatibility, deprecation and migration plan

Migrating from IBP-based setup to versioning scheme

Here are the various phases of work that would be required to deprecate IBP completely.

Phase #1

Post development, we hit phase #1 where the new versioning system is ready to be used. Here we would like to migrate clusters from IBP-based validations to the new versioning system based setup. In order to achieve this, we shall once again use an IBP double roll. This means, once a cluster has fully migrated to a certain IBP version, we can almost entirely switch to using the new versioning scheme. Let’s say the value for such an IBP version is migration_ibp_version. Then, in order for the versioning system to safely provide a migration path, we do the following:

  • Firstly, the versioning system itself is a new thing. For the initial roll out, we should only operate the versioning system after the second IBP roll → this brings the IBP of the cluster to migration_ibp_version (that’s when versioning system is fully deployed).

  • As a safeguard, each broker will validate that it’s IBP version is at least at migration_ibp_version before applying broker validations for feature versions, and before advertising it's features in ZK.

  • As a safeguard, the controller will validate that it’s IBP version is at least at migration_ibp_version before allowing for feature version upgrades to be finalized in a cluster (via the ApiKeys.UPDATE_FEATURES API).

  • All other decision making logic based on feature versions in the broker code will always validate that the broker’s current IBP version is at least at  migration_ibp_version.

Phase #2

There can be a transitional phase where a Kafka cluster can use both IBP-based setup as well as the new versioning scheme. In order to completely deprecate the existing IBP-based setup, we would want to ensure we no longer have any references to IBP configuration in the broker code base. Once that is done, we can stop using IBP configuration altogether, and deprecate/remove the relevant support in the code.

Phase #3

This is the phase when we no longer use the IBP-based setup and have completely switched to using the new versioning scheme.

Phase #4 (long-term)

Completely deprecate IBP, and remove references to the code.

Migrating from versioning scheme back to IBP (i.e. emergency downgrade)

):

  • T0: The broker cluster is running with old binary, and a stream application is communicating with it. The stream clients will be using group_coordinator:v1 feature. The leader stream thread would use its admin client to periodically call DescribeFeatures on a random broker in the cluster (presumably every minute), to get information about the latest cluster-wide enabled features. Thus, it would learn about the group_coordinator feature flag, with the finalized version at v1 .
  • T1: The cluster undergoes a rolling upgrade to the new broker binary. Once the upgrade is complete, every broker in the cluster is expected to support v1-v2 versions for the group_coordinator feature flag.
  • T2: The controller still will has enabled only group_coordinator:v1 in '/features' ZK node. So, the above rolling has no effect (yet) on the streams client side.
  • T3: Knowing that the cluster upgrade has completed, the cluster operator (i.e. a human) sends an ApiKeys.UPDATE_FEATURES request to the controller. This is to finalize the cluster-wide group_coordinator feature version level upgrade from v1 to v2. When this request is successful, it means the following:
    → The controller has checked all brokers in the cluster advertised support for group_coordinator:v2, and it has persisted the upgrade to ZK '/features' node.
    → The brokers are gradually receiving ZK notifications about the update to the '/features' ZK node. When each broker refreshes the contents of the ZK node, it will become aware that group_coordinator:v2 has been finalized cluster-wide.
  • T3-4: The streams leader thread, at a certain point in time will see the above change (via the periodic DescribeFeatures call). It will then trigger a rebalance of the streams group plumbing down group_coordinator:v2 feature flag to all members. It is key to note that this event could happen concurrently with T3 that’s why the event is labelled T3-4.
  • T5: Every member of the streams group will be gradually upgrading towards group_coordinator:v2 just as the leader instructed above. This is when the stream client will switch from the old per-task producer model to the new thread level producer model (for KIP-447).

There are some edge cases in this happy upgrade path we shall explain below.

Delay of feature version propagation

Due to eventual consistency in metadata propagation, it is possible that some brokers get notified about group_coordinator:v2 later than others. At T3-4, the streams leader thread first upgrades to group_coordinator:v2 when it learns of a feature version level bump from the broker. Then, after T5 , there exists a case where the leader streams thread could query a stale broker, and learn that the feature support only contains v1. If this situation is not handled properly, we are in a danger of downgrade. This is because in KIP-447 we don’t support downgrade of the processing model, unless user wipes out the old stream instances and restarts from scratch. Following is done to make sure the above scenario never affects the stream use case:

  1. The stream clients shall not downgrade to group_coordinator:v1 once they upgrade to group_coordinator:v2. The solution is to persist the group_coordinator flag as part of the consumers' metadata when the rebalance completes, so that on a second rebalance as long as some members are already on the newer version, the leader of the group will not kick off another feature request because it knows an upgrade was ongoing in last generation, based on the eventual consistency guarantee. It will just continue to inform all the members to upgrade to the newer version.

  2. It is important to note that the user is only upgrading the cluster, from enabling group_coordinator:v1 to enabling group_coordinator:v1-v2. Therefore, once a client sees that v2 is enabled on any broker, it should be a clear signal that all broker binaries within the cluster would support group_coordinator:v2 semantic, it’s just a matter of time before feature version update metadata propagates to every broker from controller.

The above ensures that the upgrade logic is reliable/safe, as there is no case of a downgrade affecting the stream client (during the upgrade path).

Downgrade of group_coordinator feature

As such, we don’t support downgrades in this versioning scheme/system (see Non-goals and Future work). So, there is a danger if the cluster operator tries to downgrade the group_coordinator feature flag from v2 to v1 using the --allow-downgrade flag. This is because the stream client side shall not downgrade at all, and this would crash the EOS applications. A safe practice is to first downgrade the stream clients, before downgrading  the group_coordinator feature flag.

Potential features in Kafka

There could be many features benefitted by the above system, following are a selected few high level examples:

  1. group_coordinator: As described in the above section.

  2. transaction_coordinator: This feature flag could cover changes to the message format for the transaction state internal topic schema, and protocol changes related to transaction coordinator. The advantages are similar to #2.

  3. consumer_offsets_topic_schema: Currently the message format used in the consumer internal topics is tied to IBP. It’s a candidate for a feature, because we will be able to dynamically finalize a new version of the topic schema, without having to roll the brokers twice.

  4. inter_broker_protocol: For transitional purposes, the inter.broker.protocol itself can be made a coarse-grained feature flag. This can be a way to operationally migrate away avoiding the double roll during IBP bumps. 

Deployment, IBP deprecation and avoidance of double rolls

There is a configuration key in Kafka called inter.broker.protocol (IBP). Currently, the IBP can only be set by changing the static configuration file supplied to the broker during startup. Here are the various phases of work that would be required to use this KIP to eventually avoid Broker double rolls in the cluster (whenever IBP values are advanced).

Phase #1

Post development of the feature versioning system described in this KIP, we hit phase #1 where the new versioning system is ready to be used:

  • The feature versioning system will be released under a new IBP version: migration_ibp_version.

  • For the initial roll out, we should only operate the versioning system after the second IBP roll → this brings the IBP of the cluster to migration_ibp_version (that’s when versioning system is fully deployed).

    • As a safeguard, each broker will validate that it’s IBP version is at least at migration_ibp_version before applying broker validations for feature versions, and before advertising it's features in ZK.

    • As a safeguard, the controller will validate that it’s IBP version is at least at migration_ibp_version before allowing for feature version changes to be finalized in a cluster.

Phase #2

This is the phase when both the new feature versioning system as well as the existing IBP setup (in the form of static configuration) are active in the Kafka broker. Feature flags may be optionally defined in the code as part of Kafka development. During deployment of newer Kafka releases that use feature flags, there will be a requirement to finalize such feature flags using the provided API/tooling (as required by this KIP). By the end of this phase, we still have not eliminated the requirement for double roll during broker upgrades.


Phase #3

This phase is about realizing the benefit of this KIP to avoid Broker double rolls during upgrades. For this, we would like to move away from using the existing IBP-based setup (in the form of a static configuration) in the broker code base. This requires several steps, as proposed below:

  1. We need a way to map the usage of IBP in the code (in the form of a static configuration) to the usage of IBP in the new feature versioning system. To achieve this, we introduce one or more feature flags in the code. These will be used to release features which were otherwise released under the static IBP config. We illustrate the write-up below using one such feature flag called as ibp-feature. For example, we will use the ibp-feature flag in the code at places wherever newer IBP values (from static configuration) are otherwise needed to be used:
    1. The max version values for this flag will start from 1 and continue increasing for future IBP version advancements.
    2. The min version value for this flag will start from 1, and it is unlikely to be modified (since we rarely or almost never deprecate IBP versions).
  2. By this point, IBP-based decision making in the broker code will be such that:
    1. If the ibp-feature flag is finalized and if static IBP config value is >= migration_ibp_version , then the value of the ibp-feature flag is preferred for decision making over the IBP value from static configuration.
    2. Otherwise if the ibp-feature flag is not finalized yet, we continue to use the latest IBP value based on static configuration for decision making.
  3. We then release the ibp-feature flag as part of a subsequent Kafka release. The release would eventually get deployed to Kafka clusters, and, the ibp-feature flag is expected to be finalized in the cluster (via provided tooling).
  4. Once #3 happens, all future Kafka code changes can continue to use the ibp-feature flag, thereby effectively stopping the use of IBP as a static configuration.

Phase #4

This is the phase when we no longer advance the IBP values in the old IBP-based setup (in the form of static broker configuration) and have completely switched to ibp-feature flag(s) in the Kafka code. The former will be kept around for legacy purposes onlyWe do not foresee this happening once the migration is over, and IBP has been deprecated. However, it can happen that we would want to switch back to using IBP-based setup while we are in Phase #3 above (for example, due to an unprecedented issue). This will be supported, because we deprecate IBP only in Phase #4 above. To "downgrade", a user would first need one rolling restart of the cluster to reduce the IBP to the required version. Then another rolling restart to change the binary. Note that each feature will have its own set of requirements to make this possible (in some cases, it may not be possible), but this is outside the scope of this document.

Future work

As part of future work, we could consider adding better support for downgrades & deprecation. We may consider providing a mechanism through which the cluster operator could learn whether a feature is being actively used by clients at a certain feature version. This information can be useful to decide whether the feature is safe to be deprecated. Note that this could involve collecting data from clients about which feature versions are actively being used, and the scheme for this needs to be thought through.

...