Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{ 
   "listener_security_protocol_map":{ 
      "INTERNAL":"PLAINTEXT",
      "REPLICATION":"PLAINTEXT",
      "EXTERNAL":"SASL_SSL"
   },
   "endpoints":[ 
      "INTERNAL://kafka-0.kafka.def.cluster.local:9071",
      "REPLICATION://kafka-0.kafka.abc.cluster.local:9072",
      "EXTERNAL://b3-kfc-mnope.us.clusterx:9092"
   ],
   "rack":"0",
   "jmx_port":7203,
   "host":"kafka-0.kafka.def.cluster.local",
   "timestamp":"1579041366882",
   "port":9071,
   // ----- START: PROPOSED ADDITIONAL/MODIFIED METADATA -----
   "version":5,  // existing key whose value has been bumped by 1
   "features": {  // new key
      "group_coordinator": {  // string -> name of the feature
          "min_version": 1,   // int64int16 -> represents the min supported version (>=1) of this feature
          "max_version": 3  // int64int16 -> represents the max supported version of this feature (>=1 and >= min_version)
      },
      "transaction_coordinator": { 
          "min_version": 1,
          "max_version": 4
      }
   }
   // ----- END: PROPOSED ADDITIONAL METADATA -----
}

...

Code Block
{
   "version": 0, // int64int32 -> Represents the version of the schema for the data stored in the ZK node
   "status": 1, // int32 -> Represents the status of the node   
   "features": {
       	"group_coordinator": {   // string -> name of the feature
			"min_version_level": 0, // int64int16 -> Represents the cluster-wide finalized minimum version level (>=1) of this feature
           	"max_version_level": 3 // int64int16 -> Represents the cluster-wide finalized maximum version level (>=1 and >= min_version_level) of this feature
	    },
        "consumer_offsets_topic_schema": { 
      	    "min_version_level": 0,
           	"max_version_level": 4
	}
}

...

  • The value for the version key is an int64 that int32 that contains the version of the schema of the data stored in the ZK node.

  • The value for the status key is an int32 that contains the status of the ZK node (see explanation below).

  • The value for the features key is a dictionary that contains a mapping from feature names to their metadata (such as finalized version levels).
    It's a map{string → map{string → <string | number>}}

     <feature_name>
                   |-->  <metadata_key>
                                        |-->  <metadata_value>

    • Top-level key <feature_name> is a non-empty string that’s a feature name.

      • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature metadata.

      • <metadata_value> is either a string or a number – it's the value for the <metadata_key>.

      • Note: this nested dictionary would contain the following keys:

        • 'min_version_level' whose value is an int64 representing int16 representing the minimum finalized cluster-wide version level for the feature.

        • 'max_version_level' whose value is an int64 representing int16 representing the maximum finalized cluster-wide version level for the feature.
        • the following rule always holds true: min_version_level >= 1 and max_version_level >= 1 and min_version_level <= max_version_level.

...

Feature ZK node

...

status

The 

...

Imagine a case where the '/features' ZK node is ABSENT. In such a case, when the controller comes up, there are 2 bootstrapping situations:Case #1: If the IBP is node  schema contains a field: status that takes two values as explained below.

  • Enabled (1): This status means the feature versioning system is enabled, and, the finalized features stored in the '/features' ZK node are active. This status is written by the controller to the ZK node only when the broker IBP config is greater than or equal to migration_ibp_version (see migration section).
  • Disabled (0): This status means the feature versioning system is disabled, and, the finalized features stored in the '/features' ZK node is not relevant. This status is written by the controller to the ZK node only when the broker IBP config is less than migration_ibp_

...

...

  • .

The purpose behind the status field is that it helps differentiates between the following cases:

  1. New cluster bootstrap:
    For a new Kafka cluster (i.e. it is deployed first time), we would like to start the cluster with all the possible supported features finalized immediately. The new cluster will almost never be started with an old IBP config that’s less than migration_ibp_version. In such a case, the controller will start up and notice that the '/features' is absent in the new cluster. To handle the requirement, the controller will create a '/features'

...

  1.  ZK node (with enabled status) containing the entire list of supported features as its finalized features.
  2. Cluster upgrade:
    Imagine there is an existing Kafka cluster with IBP config less than migration_ibp_version,

...

  1. but the Broker binary has been upgraded to a state where it supports

...

  1. the feature versioning system. This means the user is upgrading from an earlier version of the

...

  1. Broker binary.

...

  1. In this case, we want to start with no finalized features and allow the user to enable them whenever they are ready i.e. in the future whenever the user sets IBP config to be greater than or equal to migration_ibp_version. The reason is that enabling all the possible features immediately after an upgrade could be harmful to the cluster. In such a case:
    • Before the Broker upgrade (i.e. IBP config set to less than migration_ibp_version), the controller will start

...

    • up and check if the '/features' ZK node is absent. If true, then it will react by creating a '/features' ZK node with disabled status and empty features.
    • After the Broker upgrade (i.e. IBP config set to greater than or equal to migration_ibp_version

...

    • ),

...

    •  when the controller starts up it will check if the '/features' ZK node exists and whether it is disabled. In such a case,

...

    • it won’t upgrade all features immediately. Instead it will just switch the '/features' ZK node status to enabled status. This lets the user finalize the features later.
  1. Cluster downgrade:
    Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to migration_ibp_version. Then, the user decided to downgrade the cluster by setting IBP config to a value less than migration_ibp_version. This means the user is also disabling the feature versioning system. In this case, when the controller starts up with the lower IBP config, it will switch the '/features' ZK node status to disabled with empty features.

Changes to Kafka Controller

...

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "UpdateFeaturesRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
	  "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdate", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to features.", "fields": [
      {"name":  "AllowDowngrade", "type":  "bool", "versions":  "0+",
        "about": "When set to true, the feature version level is allowed to be downgraded/deleted."},
      {"name":  "Feature", "type":  "[]FeatureKey", "versions":  "0+",
        "about":  "The feature to be updated.",
        "fields":  [
          {"name": "Name", "type":  "string", "versions":  "0+",
            "about": "The name of the feature."},
          {"name":  "MaxVersionLevel", "type":  "int64int16", "versions":  "0+",
            "about": "The new cluster-wide finalized maximum version level for the feature. A value >= 1 is valid/regular. A value < 1, is special, and can be used to request the deletion of the feature."}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": 18,
  "type": "response", "name": "ApiVersionsResponse",
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }]
    },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    // ----- START: PROPOSED ADDITIONAL METADATA -----
    { "name":  "SupportedFeatures", "type": "[]FeatureKey",
      "versions":  "3+", "tag": 10000, "taggedVersions": "3+",
      "about": "Features supported by the broker.",
      "fields":  [
        { "name": "Name", "type": "string", "versions": "3+",
          "about": "The name of the feature." },
        { "name": "MinVersion", "type": "int64int16", "versions": "3+",
          "about": "The minimum supported version for the feature." },
        { "name": "MaxVersion", "type": "int64int16", "versions": "3+",
          "about": "The maximum supported version for the feature." }
      ]
    },
    {"name": "FinalizedFeaturesEpoch", "type": "int64int32", "versions": "3+",
      "tag": 10001, "taggedVersions": "3+",
      "about": "The monotonically increasing epoch for the features information."},
    { "name":  "FinalizedFeatures", "type": "[]FinalizedFeatureKey",
      "versions":  "3+", "tag": 10002, "taggedVersions": "3+",
      "about": "List of cluster-wide finalized features.",
      "fields":  [
        {"name": "Name", "type": "string", "versions":  "3+",
          "about": "The name of the feature."},
        {"name":  "MaxVersionLevel", "type": "int64int16", "versions":  "3+",
          "about": "The cluster-wide finalized max version level for the feature."},
        {"name":  "MinVersionLevel", "type": "int64int16", "versions":  "3+",
          "about": "The cluster-wide finalized min version level for the feature."}
      ]
    }
    // ----- END: PROPOSED ADDITIONAL METADATA -----
  ]
}

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * could return a suitable error.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

// ---- END: Proposed Admin API definitions ----

// Represents a cluster-wide finalized feature, with a feature version levels.
class FinalizedFeature {
	// The name of the feature.
	String name();

    // The cluster-wide finalized value of the feature min version level (value >= 1).
    longint16 minVersionLevel();

    // The cluster-wide finalized value of the feature max version level (value >=1 and value >= minVersionLevel).
    longint16 maxVersionLevel();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
class SupportedFeature {
	// The name of the feature.
	String name();

	// The minimum version (value >= 1) of the supported feature.
	longint16 minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	longint16 maxVersion();
}

// Represents an update to a Feature, which can be sent to the controller
// for processing.
class FeatureUpdate {
	// The name of the feature to be updated.
	String name();

    // The cluster-wide finalized NEW value of the feature max version level.
    // - When >= 1, it's the new value to-be-updated for the finalized feature.
	// - When < 1, it indicates the deletion of a finalized feature.
    long maxVersionLevel();
    
    // Return true only if downgrade/deletion of a feature should be allowed.
    // Note that despite this allowDowngrade flag being set, certain downgrades
    // may be rejected by the controller if it is deemed unsafe to downgrade
    // the max version level of some specific feature.
    bool allowDowngrade();
}

// Represents a collection of feature metadata, along with the host:port
// of the broker serving the metadata.
class FeatureMetadata {
    // The set of cluster-wide finalized features, keyed by feature name.
	Set<FinalizedFeature> finalizedFeatures();

    // The monotonically increasing epoch for the finalized features.
    longint32 epoch();

	// The set of features supported by a broker, keyed by feature name.
    Set<SupportedFeature> supportedFeatures();

	// The hostname of the broker.
	String host();

	// The port of the broker.
	int32 port();   
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the FeatureUpdate(s) that failed
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

...