Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Client Discovery:

    • By scalable, we mean the solution should horizontally scale to the metadata read and write workload as the cluster grows with more features, brokers and clients. It is expected that metadata discovery is read heavy, while write traffic is very low (max feature versions are finalized typically during releases or other configuration pushes, which, can happen few times a day per cluster).

    • The metadata served to the client will contain an epoch value. These epoch values are integers and will increase monotonically whenever the metadata changes. At any given time, the latest epoch for the metadata returned to the client is valid, and the cluster functions in accordance with the same. Note: here, the metadata epoch # applies for the entire metadata contents, and is not particularly related to the individual feature version – these are 2 separate things. Please read this section for more information.

    • We want to be careful and specific about what we mean by consistent here. The metadata we serve to the client during discovery, will be eventually consistent. It turns out that scaling to strongly consistent metadata reads is not easy (in the current Kafka setup). And the cost of a solution that’s eventually consistent, can be made minimal, in the following way. Due to eventual consistency, there can be cases where an older lower epoch of the metadata is briefly returned during discovery, after a more recent higher epoch was returned at a previous point in time. We expect clients to always employ the rule that the latest received higher epoch of metadata always trumps an older smaller epoch. Those clients that are external to Kafka should strongly consider discovering the latest metadata once during startup from the brokers, and if required refresh the metadata periodically (to get the latest metadata).

  2. Feature gating:

    • By safe, we mean: when processing a request to finalize a set of feature versions, the system will dynamically verify that all brokers in the cluster support the intended version. If not, the request will be rejected. Also, when a broker starts up, it will verify that it is compatible with the configured feature versions. If it is not, it will either refuse to start up or eventually die before it becomes an active member of the cluster (by active, we mean the point when the broker starts serving API requests)as soon as it discovers a feature incompatibility.

    • By durable, we mean that the finalized features should be persisted durably and remembered across broker restarts. Generally the system should tolerate faults, as much as it does for storage of other critical metadata (such as topic configuration).

    • By dynamic, we mean that the finalized features can be mutated in a cheap/easy way without compromising the uptime of broker/controller processes, or the health of the cluster.

...

Code Block
{ 
   "listener_security_protocol_map":{ 
      "INTERNAL":"PLAINTEXT",
      "REPLICATION":"PLAINTEXT",
      "EXTERNAL":"SASL_SSL"
   },
   "endpoints":[ 
      "INTERNAL://kafka-0.kafka.def.cluster.local:9071",
      "REPLICATION://kafka-0.kafka.abc.cluster.local:9072",
      "EXTERNAL://b3-kfc-mnope.us.clusterx:9092"
   ],
   "rack":"0",
   "jmx_port":7203,
   "host":"kafka-0.kafka.def.cluster.local",
   "timestamp":"1579041366882",
   "port":9071,
   "version":5,
   // ----- START: PROPOSED ADDITIONAL/MODIFIED METADATA -----
   "version":5,  // existing key whose value has been bumped by 1
   "features": {  // new key
      "exactly_once_semantics": {  // string -> name of the feature
          "minVersion": 0,   // int64 -> represents the min supported version of this feature
          "maxVersion": 3  // int64 -> represents the max supported version of this feature
      },
      "consumer_offsets_topic_schema": { 
          "minVersion": 1,
          "maxVersion": 4
      }
   }
   // ----- END: PROPOSED ADDITIONAL METADATA -----
}

...

  • The value for the version key is an int64 that contains the version of the schema of the data stored in the ZK node.

  • The value for the data key is a dictionary that contains the following:
    • A key called features whose value is a dictionary. This contains a mapping from feature names to their metadata (such as finalized versions). It's a map{string → map{string → <string | number>}}

      <feature_name>
                     |-->  <metadata_key>
                                          |-->  <metadata_value>

      • Top-level key <feature_name> is a non-empty string that’s a feature name.

        • <metadata_key> refers to the second nested level key that’s a non-empty string that refers to some feature metadata.

        • <metadata_value> is either a string or a number – it's the value for the <metadata_key>.

        • Note: this nested dictionary would contain the key: 'version' whose value is an int64 representing the finalized cluster-wide max version for the feature.

New controller

...

APIs

We introduce 2 new Admin APIs that’s served only by the controller, and identified by the new API keys: ApiKeys.UPDATE_FEATURES and ApiKeys.DELETE_FEATURES. This API enables safe application of a set of cluster-wide feature updates:

  • ApiKeys.UPDATE_FEATURES:

    • The input to the API is a list of FeatureUpdate that need to be applied. Each item specifies an operation (as a FeatureUpdateType) such as:

      • Adding or updating a new Feature with a finalized max version. Addition of a feature can happen happens happen when the cluster-wide max feature version is being finalized for the first time for a feature.

      • Optionally also enable downgrades as part of a certain FeatureUpdate.

    • The response contains an error code and an error message.
  • ApiKeys.DELETE_FEATURES:

    • The input to the API is a list of features that need to be deleted.
    • The response contains an error code and an error message.

...

  1. Each FeatureUpdate provided in the request was valid, and all updates were persisted to the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch. This mechanism also enables brokers to shutdown when incompatibilities are detected (see Broker protections section).

...

  1. Each feature provided in the request was valid, and all provided features were removed from the '/features' ZK node by the controller (along with a bump to the ZK node version).

  2. Brokers in the cluster have gradually started receiving notifications (via ZK watches) on the changes to '/features' ZK node. They react by reading the latest contents of the node from ZK, and re-establishing the ZK watch.

When a call to the ApiKeys.DELETE_FEATURES API has failed on the client side, the following is guaranteed:

...