Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [	
	{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "about": "The error message, or null if there was no error." }
  ]
}

Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at '/brokers/ids/<id>'). The contents of the broker nodes are cached in memory in the Controller.

Underneath hood, the implementation of the ApiKeys.UPDATE_FEATURES API does the following:

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.UPDATE_FEATURES API call at any given time.

  2. Prior to mutating '/features' in ZK, the implementation verifies that all broker advertisements (as seen by it thus far) contained no incompatibilities with the provided List<FeatureUpdate> (see Validations section below).

  3. Once validations in #2 are successful, the List<FeatureUpdate> are applied to contents of '/features' ZK node. Upon successful write, ZK should automatically increment the version of the node by 1.

  4. Finally, an UpdateFeatureResponse is returned to the user.

Validations

For any <feature_name>, the above API implementation guards against a change for the related entry in '/features' from {"version": X} to {"version": X’}, unless, it notices that each live broker in the deployment advertises {"maxVersion": Y >= X’} and {"minVersion": Z <= X’} in it’s BrokerIdZnode (for the same <feature_name>).

  1. For the above guarding checks:

    1. Cluster-wide feature max version downgrades are disallowed by default, unless FeatureUpdateType is explicitly set to ADD_OR_UPDATE_ALLOW_DOWNGRADE.
    2. If any broker does not contain a required feature, this is considered an incompatibility → such a case will fail the API request.

    3. If any broker contains an additional feature that’s not required → this is not considered an incompatibility.

  2. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

  3. Activating the effects of a feature version cluster-wide is left to the discretion of the logic implementing the feature (ex: can be done via dynamic broker config).

  4. Deprecating/eliminating the presence/effects of a specific feature version cluster-wide is left to the discretion of the logic backing the feature (ex: can be done via dynamic broker config). However, it may need further external verification to ensure no entity (ex: a consumer, or a broker) is actively using the feature (see Non-goals section).

Guarantees

When a call to the ApiKeys.UPDATE_FEATURES API returns a success return code to the client, the following is guaranteed:

  1. Each FeatureUpdate provided in the request was valid, and all updates were persisted to the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch. This mechanism also enables brokers to shutdown when incompatibilities are detected (see Broker protections section).

When a call to the ApiKeys.UPDATE_FEATURES API has failed on the client side, the following is guaranteed:

 // - If the request was processed by a broker that's not the controller,
    //   then this response will contain the existing NOT_CONTROLLER error code.
    // - If the request is being concurrently processed by the controller,
    //   then this response will contain the FEATURE_UPDATE_IN_PROGRESS
    //   error code (a new error code).
    // - If the request contained at least one FeatureUpdate that cannot
    //   be applied, then this response will contain the error code FEATURE_UPDATE_FAILED
    //   (a new error code) for the first such FeatureUpdate that failed.
	{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "about": "The error message, or null if there was no error." }
  ]
}

Feature advertisements from each broker in the cluster are reactively picked up by the controller via ZK watches. Today these watches are already set up by the controller on every broker’s ephemeral ZK node (at '/brokers/ids/<id>'). The contents of the broker nodes are cached in memory in the Controller.

Underneath hood, the implementation of the ApiKeys.UPDATE_FEATURES API does the following:

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.UPDATE_FEATURES API call at any given time.

  2. Prior to mutating '/features' in ZK, the implementation verifies that all broker advertisements (as seen by it thus far) contained no incompatibilities with the provided List<FeatureUpdate> (see Validations section below).

  3. Once validations in #2 are successful, the List<FeatureUpdate> are applied to contents of '/features' ZK node. Upon successful write, ZK should automatically increment the version of the node by 1.

  4. Finally, an UpdateFeatureResponse is returned to the user.

Validations

For any <feature_name>, the above API implementation guards against a change for the related entry in '/features' from {"version": X} to {"version": X’}, unless, it notices that each live broker in the deployment advertises {"maxVersion": Y >= X’} and {"minVersion": Z <= X’} in it’s BrokerIdZnode (for the same <feature_name>).

  1. For the above guarding checks:

    1. Cluster-wide feature max version downgrades are disallowed by default, unless FeatureUpdateType is explicitly set to ADD_OR_UPDATE_ALLOW_DOWNGRADE.
    2. If any broker does not contain a required feature, this is considered an incompatibility → such a case will fail the API request.

    3. If any broker contains an additional feature that’s not required → this is not considered an incompatibility.

  2. Some/all of the above logic will also be used by the broker (not just the controller) for it’s protections (see this section of this KIP).

  3. Activating the effects of a feature version cluster-wide is left to the discretion of the logic implementing the feature (ex: can be done via dynamic broker config).

  4. Deprecating/eliminating the presence/effects of a specific feature version cluster-wide is left to the discretion of the logic backing the feature (ex: can be done via dynamic broker config). However, it may need further external verification to ensure no entity (ex: a consumer, or a broker) is actively using the feature (see Non-goals section).

Guarantees

When a call to the ApiKeys.UPDATE_FEATURES API returns a success return code to the client, the following is guaranteed:

  1. Each FeatureUpdate provided in the request was valid, and all updates were persisted to the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch. This mechanism also enables brokers to shutdown when incompatibilities are detected (see Broker protections section).

When a call to the ApiKeys.UPDATE_FEATURES API has failed on the client side, the following is guaranteed:

  1. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  2. A network error,

  3. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  4. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of List<FeatureUpdate> may have either succeeded or failed.

...

Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "DeleteFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [	
	{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "about": "The error message, or null if there was no error." }
  ]
}

...

 // - If the request was processed by a broker that's not the controller, then
    //   this response will contain the existing NOT_CONTROLLER error code.
    // - If the request contained a feature that was never present, then this
    //   response will contain the FEATURE_NOT_EXISTS error code (a newly added
    //    error code).
	{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+",
      "about": "The error message, or null if there was no error." }
  ]
}


Underneath hood, the implementation of the ApiKeys.DELETE_FEATURES API does the following:

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.DELETE_FEATURES API call at any given time.

  2. This API should only be used in circumstances where a finalized feature needs to be garbage collected.
  3. Unlike the ApiKeys.DELETE_FEATURES API, this API mutates '/features' in ZK without any guarding checks on feature versions. Upon successful write, ZK should automatically increment the version of the node by 1.

Guarantees

When a call to the ApiKeys.DELETE_FEATURES API returns a success return code to the client, the following is guaranteed:

  1. Each feature provided in the request was valid, and all provided features were removed from the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch.

When a call to the ApiKeys.DELETE_FEATURES API has failed on the client side, the following is guaranteed:

  1. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of the deletions may have either succeeded or failed.

Client discovery of finalized feature versions

The latest list of cluster-wide finalized feature versions will be served via the existing ApiKeys.API_VERSIONS API. This is used for client discovery, as well as for debuggability of the system. We will introduce a tagged optional field in the schema of the ApiVersionsResponse to accommodate the information. Couple implementation details:

  • Whenever ZK notifies the broker about a change to '/features' node, the broker reads the latest list of finalized features from ZK and caches it in memory (also see Broker protections section in this doc).

  • We will modify the implementation of the ApiKeys.API_VERSIONS API to populate the information about finalized features in the response. This will contain the cluster-wide finalized versions from the broker’s internal ZK cache of '/features' node contents (as explained above), as well as the broker-level supported versions.

  • The finalized feature versions populated in the response will be eventually consistent, since it is refreshed in the brokers via ZK watches.

Below is new ApiVersionsResponse schema showing the two new tagged optional fields for feature versions.

Note that the field FinalizedFeaturesEpoch contains the latest version of the '/features' node in ZK. On the client side, the epoch can be used to ignore older features metadata returned in ApiVersionsResponse, after newer features metadata was returned once (eventual consistency).

ApiVersionsResponse schema (updated)


Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "
  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.DELETE_FEATURES API call at any given time.

  2. This API should only be used in circumstances where a finalized feature needs to be garbage collected.
  3. Unlike the ApiKeys.DELETE_FEATURES API, this API mutates '/features' in ZK without any guarding checks on feature versions. Upon successful write, ZK should automatically increment the version of the node by 1.

Guarantees

When a call to the ApiKeys.DELETE_FEATURES API returns a success return code to the client, the following is guaranteed:

  1. Each feature provided in the request was valid, and all provided features were removed from the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch.

When a call to the ApiKeys.DELETE_FEATURES API has failed on the client side, the following is guaranteed:

  1. A server-side error means the '/features' ZK node is guaranteed to be left unmodified.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of the deletions may have either succeeded or failed.

Client discovery of finalized feature versions

The latest list of cluster-wide finalized feature versions will be served via the existing ApiKeys.API_VERSIONS API. This is used for client discovery, as well as for debuggability of the system. We will introduce a tagged optional field in the schema of the ApiVersionsResponse to accommodate the information. Couple implementation details:

  • Whenever ZK notifies the broker about a change to '/features' node, the broker reads the latest list of finalized features from ZK and caches it in memory (also see Broker protections section in this doc).

  • We will modify the implementation of the ApiKeys.API_VERSIONS API to populate the information about finalized features in the response. This will contain the cluster-wide finalized versions from the broker’s internal ZK cache of '/features' node contents (as explained above), as well as the broker-level supported versions.

  • The finalized feature versions populated in the response will be eventually consistent, since it is refreshed in the brokers via ZK watches.

Below is new ApiVersionsResponse schema showing the two new tagged optional fields for feature versions.

Note that the field FinalizedFeaturesEpoch contains the latest version of the '/features' node in ZK. On the client side, the epoch can be used to ignore older features metadata returned in ApiVersionsResponse, after newer features metadata was returned once (eventual consistency).

ApiVersionsResponse schema (updated)

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version, inclusive." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version, inclusive." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "Version", "type": "int64", "versions":  "3+",
          "about": "The cluster-wide finalized max version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

...

  • Read cluster-wide finalized feature versions from a broker or a controller via it’s ApiKeys.API_VERSIONS API.

  • Add/update cluster-wide finalized feature versions by exercising the newly introduced ApiKeys.UPDATE_FEATURES API on a controller.

  • Delete cluster-wide finalized features by exercising the newly introduced ApiKeys.DELETE_FEATURES API on a controller.

We shall introduce couple 3 new APIs in the Admin interface, which enables us to read the feature versions and finalize feature version upgrades. Below is Java-ish pseudocode for the same.

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * wouldcould return a suitable exceptionerror.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

/**
 * Delete the specified cluster-wide finalized features.
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DeleteFeaturesResult. For example,
 * if feature deletion was in progress already, the controller
 * would return a suitable exception.
 *
 * @param features   set of feature names to be deleted.
 * @return           the result of the deleteFeatures request
 */
DeleteFeaturesResult deleteFeatures(Set<String> features);

// ---- END: Proposed Admin API definitions ----

interface FeatureBase {
	// The name of the feature.
	String getName();
}

// Represents a cluster-wide finalized feature, with a feature version.
interface FinalizedFeature extends FeatureBase {
    // The cluster-wide finalized value of the feature max version.
    long getVersion();
}

enum FeatureUpdateType {
    // Either add the feature (if not present), or update it (if present).
    ADD_OR_UPDATE,

	// Same as ADD_OR_UPDATE, but with downgrade allowed.
	ADD_OR_UPDATE_ALLOW_DOWNGRADE
}

interface FeatureUpdate {
    // Return the feature to be updated.
	// The version returned via 'getFeature().getVersion()', is the new value to-be-updated.
    FinalizedFeature getFeature();
    
    // Return the type of update to be made.
    FeatureUpdateType getUpdateType();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
interface SupportedFeature extends FeatureBase {
	// The minimum version (value >= 0) of the supported feature.
	long getMinVersion();

	// The maximum version (value >=0 and value >= minVersion) of the supported feature.
	long getMaxVersion();
}

interface FeatureMetadata {
    // The set of cluster-wide finalized features.
	Set<FinalizedFeature> finalizedFeatures;

    // The monotonically increasing epoch for the finalized features.
    long getEpoch();

	// The set of features supported by a broker.
    Set<SupportedFeature> supportedFeatures;

	// The hostname of the broker.
	String host;

	// The port of the broker.
	int32 port;   
}

interface DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

interface UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the first failed FeatureUpdate
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

interface DeleteFeaturesResult {
	/**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual feature deletion errors were seen.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the first failed deletion
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

...

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.
$> kafka-features.sh describe --bootstrap-server kafka-broker0.prn1:9071 [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 0,
        	"maxVersion": 0
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
            "version": 1
        },
        "transaction_coordinator": {
        	"version": 4
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--upgrade` to provide a comma-separated list of features and new finalized max version to ADD_OR_UPDATE.
#  - Use `--force_downgrade` to provide a comma-separated list of features and new finalized max version, with a downgrade allowed for feature versions. This should be used only when required.

$> kafka-features.sh update \
      --bootstrap-server kafka-broker0.prn1:9071 \
      --upgrade group_coordinator:2,my_new_feature:0 \
      --force-downgrade transaction_coordinator:3 \

Please confirm before downgrading the following features:
1.transaction_coordinator from v4 (existing) to v3 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 0,
        	"maxVersion": 0
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
            "version": 2
        },
        "transaction_coordinator": {
        	"version": 3
        },
        "consumer_offsets_topic_schema": { 
            "version": 0
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DISABLE FEATURES ===

# Disable a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of features to be disabled.

$> kafka-features.sh disable --bootstrap-server kafka-broker0.prn1:9071 --features group_coordinator,transaction_coordinator

Please confirm disabling of the following features. Note that the latest finalized feature version value for the disabled feature, will be lost forever:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 0,
        	"maxVersion": 0
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": { 
            "version": 0
         }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}}
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

New or Changed Public Interfaces

Summary of changes:

  1. We introduce 2 new APIs in the broker RPC interface (see this section). These APIs can only be served by the controller, and these are ApiKeys.UPDATE_FEATURES (schema) and ApiKeys.DELETE_FEATURES (schema).
  2. We introduce few optional fields in the ApiVersionsResponse containing the cluster-wide finalized feature metadata, feature metadata epoch, and the broker's supported features (see this section).
  3. We introduce 3 new APIs in the Admin interface to describe/addOrUpdate/delete features (see this section). Underneath covers, these exercise the APIs mentioned above.

Use case: group_coordinator feature flag (for KIP-447)

...