Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Client discovery: Provide an API to programmatically access the metadata. Here, the “metadata” refers to the cluster-wide finalized versions of broker features. We would like to serve this metadata in a versioned, eventually an eventually consistent and scalable way.

  2. Feature gating: Provide an API to safely, durably and dynamically finalize the upgrades to cluster-wide supported versions of features. The API (and it’s related tooling) can be used by the cluster operator (i.e. typically a human) to finalize feature version upgrades.

  3. IBP deprecation: As a good side-effect of the above, we would like to provide a path to eventually deprecate the need for the cumbersome inter.broker.protocol configuration, and the broker “double roll” mechanism. A very tangible benefit of the solution is that we should be able to do broker upgrades with just a single rolling restart.

...

  1. Client Discovery:

    • By scalable, we mean the solution should horizontally scale to the metadata read and write workload as the cluster grows with more features, brokers and clients. It is expected that metadata discovery is read heavy, while write traffic is very low (feature versions are finalized typically during releases or other configuration pushes, which, can happen few times a day per cluster).

    • The metadata served to the client will be versioned. The version numbers contain an epoch value. These epoch values will increase monotonically whenever the metadata changes. At any given time, the latest version of epoch for the metadata returned to the client is valid, and the cluster functions in accordance with the same. Note: here the metadata version epoch # applies for the entire metadata contents, and is unrelated not particularly related to the individual feature version – these are 2 separate things.

    • We want to be careful and specific about what we mean by consistent here. The metadata we serve to the client during discovery, will be eventually consistent. It turns out that scaling to strongly consistent metadata reads is not easy (in the current Kafka setup). And the cost of a solution that’s eventually consistent, can be made minimal, in the following way. Due to eventual consistency, there can be cases where an older version epoch of the metadata is briefly returned during discovery, after a more recent version epoch was returned at a previous point in time. We expect clients to tolerate staleness by no more than a difference of 1 metadata version numberepoch bump, and always accept the latest received metadata version epoch to trump an older versionepoch. The client should strongly consider discovering the latest metadata once during startup, and if required refresh the metadata periodically (to get the latest metadata).

  2. Feature gating:

    • By safe, we mean: when processing a request to finalize a set of feature versions, the system will dynamically verify that all brokers in the cluster support the intended version. If not, the request will be rejected. Also, when a broker starts up, it will verify that it is compatible with the configured feature versions. If it is not, it will either refuse to start up or eventually die before it becomes an active member of the cluster (by active, we mean the point when the broker starts serving API requests).

    • By durable, we mean that the finalized features should be persisted durably and remembered across broker restarts. Generally the system should tolerate faults, as much as it does for storage of other critical metadata (such as topic configuration).

    • By dynamic, we mean that the finalized features can be mutated in a cheap/easy way without compromising the uptime of broker/controller processes, or the health of the cluster.

...

The above proposed read/write paths are described later in this doc. Below is an example of the contents of the '/features' ZK node, with it’s schema explained below.

Schema (JSON)


Code Block
[
   // header section
   {
         "__version__": 0 , // int64 -> representsRepresents the version of the data
schema for the },
data stored in //the dataZK sectionnode
   "data": {
        "exactly_once_semanticsepoch": { 0,  // stringint64 -> name ofRepresents the feature
version of the contents of the "data" dictionary.
		"features": {
        	"exactly_once_semantics": {   // string -> name of the feature
            	"version": 3 // int64 -> representsRepresents the cluster-wide finalized version of this feature
	        },
    	    "consumer_offsets_topic_schema": { 
        	    "version": 4
	        }
		}
   }
]}

The schema is a JSON list with 2 items, each being a dictionary and each with unique few different keys and values, as explained below:

  • The value for the version key is an int64 that contains the version of the schema of the data stored in the ZK node.

  • The value for the data key is a dictionary that contains the following:
    • A key called epoch whose value is an int64. This represents the version of the contents of the data dictionary. For example, this gets bumped whenever the controller modifies the features via the new API (see this section).
    • A key called features whose value
    First item in the list
    • is a dictionary.
    It's a map{string → long} containing node metadata (such as data version).Second item in the list is again a dictionary.
    • This contains a mapping from feature names to their metadata (such as finalized versions). It's a map{string → map{string →
    long
    • <string | number>}}
    . Imagine this as being a map of:
    • <feature_name>
                     |-->  <metadata_key>
                                          |-->  <metadata_value>

      • Top-level key <feature_name> is a non-empty string that’s a feature name.

        • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature

    or API
        • metadata.

     <metadata
        • <metadata_value> is either a string

    that’s
        • or a number – it's the value for the <metadata_key>.

        • Note: this nested dictionary

    should
        • would contain

    at least
        • the key: 'version' whose value

    represents a
        • is an int64 representing the finalized cluster-wide version for the feature.

New controller API

We introduce a new Admin API that’s served only by the controller, and identified by the new API key: ApiKeys.UPDATE_FEATURES. This API enables safe application of a set of cluster-wide feature updates:

  • The input to the API is a list of FeatureUpdate that need to be applied. Each item specifies an operation (as a FeatureUpdateType) such as: adding a new Feature, updating or deleting an existing Feature.

  • The response (during success) is a list of latest finalized Feature, with a metadata version number an epoch number (monotonically increasing).

...

Code Block
{
  "apiKey": 48,
  "type": "response",
  "name": "UpdateFeaturesResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    {"name": "MetadataVersionFeaturesEpoch", "type": "int64", "versions": "1+",
      "about": "The monotonically increasing versionepoch of the metadata for finalized features."},
    { "name":  "Features", "type": "[]FeatureKey", "versions":  "1+",
      "about": "Cluster-wide finalized features and their versions.",
      "fields":  [
        {"name": "Name", "type":  "string", "versions":  "1+",
          "about": "The name of the feature."},
        {"name":  "Version", "type":  "int64", "versions":  "1+",
          "about": "The finalized version for the feature."}
      ]
    }
  ]
}

...

  1. For correctness reasons, the controller shall allow processing of only one inflight ApiKeys.UPDATE_FEATURES API call at any given time.

  2. Prior to mutating '/features' in ZK, the implementation verifies that all broker advertisements (as seen by it thus far) contained no incompatibilities with the provided List<FeatureUpdate> (see Validations section below).

  3. Once validations in #2 are successful, the List<FeatureUpdate> are applied to contents of '/features' ZK node, and it’s __version__ is epoch is incremented by 1.

  4. If #3 is successful, ApiKeys.UPDATE_METADATA requests are issued to all brokers in the cluster. This can incur a slight latency cost, so we expect this API call to succeed/fail in worst case within few seconds (depending on the size of the cluster).

  5. Finally, an UpdateFeatureResponse is returned to the user. The metadata versionepoch number is the latest __version__ epoch in the ZK node: '/features' after the mutations were applied to it.

...

  1. Each FeatureUpdate provided in the request was valid, and all updates were persisted to the '/features' ZK node by the controller (along with a bump to the node’s  __version__epoch).

  2. An ApiKeys.UPDATE_METADATA call was successfully made by the controller to all live brokers (as seen by the controller) with the latest dictionary of cluster-wide finalized feature versions.

  3. Future ApiKeys.UPDATE_METADATA calls from controller to brokers will contain the latest set of cluster-wide feature versions, enabling brokers to shutdown when incompatibilities are detected (see Broker protections section).

  4. The returned object in the ApiKeys.UPDATE_FEATURES API call contains the latest set of cluster-wide feature versions (after all updates were applied), which can be useful to the client for debugging/logging purposes.

...

  1. A server-side error means the '/features' ZK node is guaranteed to be left untouchedunmodified.

  2. A network error, or if the client dies during the request, or if the server dies when processing the request, it means the user should assume that the application of List<FeatureUpdate> may have either succeeded or failed.

Here is the new schema of UpdateMetadataRequest with the finalized features added to it (the metadata version contains epoch contains the latest value, after it was incremented):

...

Code Block
{
  "apiKey": 6,
  "type": "request",
  "name": "UpdateMetadataRequest",
  "validVersions": "0-6",
  "flexibleVersions": "6+",
  "fields": [
    ...
    ...
    { "name": "LiveBrokers", "type": "[]UpdateMetadataBroker", "versions": "0+", "fields": [
        { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The broker id." },
          ...
          ...
          { "name": "SecurityProtocol", "type": "int16", "versions": "1+",
            "about": "The security protocol type." }
        ]},
        { "name": "Rack", "type": "string", "versions": "2+", "nullableVersions": "0+", "ignorable": true,
          "about": "The rack which this broker belongs to." }
    ]},
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    {"name": "MetadataVersionFeaturesEpoch", "type": "int64", "versions": "6+",
      "tag": 10000, "taggedVersions": "6+",
      "about": "The monotonically increasing version of the metadata epoch for finalizedfeatures featuresinformation."},
    { "name":  "Features", "type": "[]FeatureKey",
      "versions":  "6+", "tag": 10001, "taggedVersions": "6+",
      "about": "The list of finalized features cluster-wide finalized features and their versions.",
      "fields":  [
        {"name": "Name", "type":  "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "Version", "type":  "int64", "versions":  "3+",
          "about": "The finalized version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ],
  "commonStructs": [
    ...
    ...
  ]
}

...

Below is new ApiVersionsResponse schema showing the two new tagged optional fields for feature versions.

Note that the field FinalizedFeaturesMetadataVersionFinalizedFeaturesEpoch contains the latest version of the metadata epoch stored in the __version__ epoch field in the ZK node. On the client side, the value of this field should epoch can be used to ignore older features metadata returned in ApiVersionsResponse, after newer features metadata was returned once (eventual consistency).

...

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64", "versions": "3+",
          "about": "The minimum supported version, inclusive." },
        { "name": "MaxVersion", "type": "int64", "versions": "3+",
          "about": "The maximum supported version, inclusive." }
      ]
    },
    {"name": "FinalizedFeaturesMetadataVersionFinalizedFeaturesEpoch", "type": "int64", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing versionepoch offor the metadata for finalized featuresfeatures information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "Version", "type": "int64", "versions":  "3+",
          "about": "The version for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

// ---- END: Proposed Admin API definitions ----

interface FeatureBase {
	// The name of the feature.
	String getName();
}

// Represents a cluster-wide finalized feature, with a feature version.
interface FinalizedFeature extends FeatureBase {
    // The finalized value of the feature version.
    long getVersion();
}

enum FeatureUpdateType {
    // Delete the feature. Useful to eliminate accidental junk,
    // or rarely eliminate a deprecated feature altogether.
    DELETE,

    // Either add the feature (if not present), or update it (if present).
    ADD_OR_UPDATE
}

interface FeatureUpdate {
    // Return the feature to be updated.
	// The version returned via 'getFeature().getVersion()' is:
    //  - for the FeatureUpdateType.ADD_OR_UPDATE operation, this is the new value to-be-updated.
    //  - for the FeatureUpdateType.DELETE operation, this is ignored.
    FinalizedFeature getFeature();
    
    // Return the type of update to be made.
    FeatureUpdateType getUpdateType();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
interface SupportedFeature extends FeatureBase {
	// The minimum version (value >= 0) of the supported feature.
	long getMinVersion();

	// The maximum version (value >=0 and value >= minVersion) of the supported feature.
	long getMaxVersion();
}

interface FeatureMetadata {
    // The set of cluster-wide finalized features.
	Set<FinalizedFeature> finalizedFeatures;

    // The monotonically increasing metadataepoch version for the finalized features.
    long getMetadataVersiongetEpoch();

	// The set of features supported by a broker.
    Set<SupportedFeature> supportedFeatures;

	// The hostname of the broker.
	String host;

	// The port of the broker.
	int32 port;   
}

interface UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the first failed FeatureUpdate
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

interface DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

...

Code Block
=== DESCRIBE FEATURES ===

# Get cluster-wide finalized features, and features supported by a specific broker.
#  - Use `--bootstrap-server` to provide a broker host:port to which queries should be issued.
#  - Optionally, provide `--controller` flag directing the tool to issue the query to the
#    controller (while discovering the controller via the bootstrap server).
#    This can be useful for debugging purposes.
$> kafka-features.sh describe --bootstrap-server kafka-broker0.prn1:9071 [--controller]

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 3,
        	"maxVersion": 7
        }
	},
	"finalized_features": {
        "epoch": 0,
        "group_coordinator": {
            "version": 1
        },
        "transaction_coordinator": {
        	"version": 4
        },
        "consumer_offsets_topic_schema": { 
            "version": 7
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== ADD_OR_UPDATE FEATURES ===

# Add or update a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual ADD_OR_UPDATE request is issued.
#  - Use `--features` to provide a comma-separated list of features and their new finalized versions to ADD_OR_UPDATE.

$> kafka-features.sh update --bootstrap-server kafka-broker0.prn1:9071 --features group_coordinator:2,transaction_coordinator:5

Please confirm before finalizing the upgrade of the following features:
1. group_coordinator from v1 (existing) to v2 (new)
2. transaction_coordinator from v4 (existing) to v5 (new)

[Y/n]? Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 3,
        	"maxVersion": 7
        }
	},
	"finalized_features": {
        "epoch": 1,
        "group_coordinator": {
            "version": 2
        },
        "transaction_coordinator": {
        	"version": 5
        },
        "consumer_offsets_topic_schema": { 
            "version": 7
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

=== DELETE FEATURES ===

# Delete a list of cluster-wide finalized features.
#  - Use `--bootstrap-server` to provide a broker host:port to which MetadataRequest query should be issued.
#    The MetadataResponse will be used to discover the Controller, to which the actual delete request is issued.
#  - Use `--features` to provide a comma-separated list of features to be deleted.

$> kafka-features.sh delete --bootstrap-server kafka-broker0.prn1:9071 --features group_coordinator,transaction_coordinator

Please confirm deletion of the following features:
1. group_coordinator
2. transaction_coordinator

[Y/n] Y

{
	"status": "OK",
	"supported_features": {
		"group_coordinator": {
            "minVersion": 1,
            "maxVersion": 2
        },
        "transaction_coordinator": {
        	"minVersion": 0,
        	"maxVersion": 5
        },
        "consumer_offsets_topic_schema": { 
            "minVersion": 3,
        	"maxVersion": 7
        }
	},
	"finalized_features": {
		"epoch": 2,
        "consumer_offsets_topic_schema": { 
            "version": 7
        }
   },
   "host": "kafka-broker0.prn1",
   "port": 9071
}

...

  1. The stream clients shall not downgrade to group_coordinator:v0 once they upgrade to group_coordinator:v1, even if the leader instructs them to do so. This is because, the feature version in the the DescribeFeatures for  response that contains group_coordinator:v1 is , the epoch for the features is higher than earlier response. This will be therefore used by the client The client uses the epoch to ignore the earlier response from the broker. (see this section to learn about the monotonically increasing metadata versionepoch number)

  2. It is important to note that the user is only upgrading the cluster, from enabling group_coordinator:v0 to enabling group_coordinator:v0-v1. Therefore, once a client sees that v1 is enabled on any broker, it should be a clear signal that all broker binaries within the cluster would support group_coordinator:v1 semantic, it’s just a matter of time before feature version update metadata propagates to every broker from controller.

...