Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Sometimes there can be a need to downgrade the cluster-wide availability of one or more broker features from some finalized feature version level X to a version level Y, where Y < X. Firstly, we leave it to the cluster operator (i.e. human) to decide whether the downgrade is backwards compatible in the first place i.e. externally ensure the changes introduced/exercised in version Y are no longer relevant in the system ahead of the downgrade. As for the support we provide:

    1. The rules are the same as with upgrades → such a downgrade request is rejected by the system, unless all brokers support the downgraded version Y of the feature.

    2. We assume downgrades of finalized feature version levels are rare. For safety reasons, we request for the human to specify an explicit "allow downgrade" flag (in the API/tool) to avoid safeguard against easy accidental downgrades to finalized version levels.
  2. A need can arise to deprecate the usage of a certain version of one or more broker feature. We do not provide any special support for this. Instead, the cluster operator (i.e. human) should use external means to establish that it is safe to stop supporting a particular version of broker feature. For example, verify (if needed) that no clients are actively using the version, before deciding to stop supporting it.

...

  • Each broker will advertise the version range of it’s supported features in it’s own BrokerIdZnode during startup. The contents of the advertisement are specific to the broker binary, but the schema is common to all brokers.
  • The controller already watches BrokerIdZnode updates, and will serve as a gateway (via a newly introduced API) to finalize version upgrades to features – controller is well positioned to serve this requirement since it has a global view of all BrokerIdZnode in the cluster, and can avoid most race conditions in the solution.

  • As for the The metadata about cluster-wide finalized version levels of features, this information will be persisted is maintained in ZK by the controller. As a general rule, any active finalized feature version value is an int64 that's always >= 1. The value can be increased to indicate a version upgrade, or decreased to indicate a downgrade (or, if the value is < 1, it is considered as an ask for feature version level deletion).

  • The controller will be the one and only entity modifying this information (we the information about finalized feature version levels. We expect metadata write traffic to be very low). All brokers in the cluster setup watches on the ZK node containing the finalized features information, thereby they get notified whenever the contents of the ZK node are change via the controller.

    As a general rule, active finalized feature version values start from 1. The value can be increased to indicate a version upgrade, and decreased to indicate a downgrade (or deletion if value is < 1)

    .

  • Finally, client discovery about finalized feature versions will be performed via ApiKeys.API_VERSIONS API – the response will be modified to contain the necessary information (served from broker cache). Using this decentralized approach, we scale for metadata reads, albeit with eventually consistent results.

...

Each broker’s supported dictionary of {feature → version range} will be defined in the broker code. For each supported feature, the supported version range is defined by a min_version (an int64 starting always from 1) and max_version (>an int64 >=1 and >= minVersion).

The controller needs a way to discover this metadata from each broker. To facilitate the same, during startup, each broker will advertise it’s supported {feature → version range} in it’s own ephemeral BrokerIdZnode (this is the existing ZK node at the path '/brokers/ids/<id>'), under a nested dictionary keyed 'features'. The controller already watches all BrokerIdZnode for updates, and thus can get it’s ZK cache populated with the per-broker versioning information (using existing means).

...

Code Block
languagejava
// ---- START: Proposed Admin API definitions ----
/**
 * Return the following:
 * 1. List of cluster-wide finalized feature versions.
 * 2. List of supported feature versions specific to the broker.
 *
 * You may anticipate certain exceptions when calling get() on the
 * futures obtained from the returned DescribeFeaturesResult.
 */
DescribeFeaturesResult describeFeatures();

/**
 * Update the feature versions supported cluster-wide. You may
 * anticipate certain exceptions when calling get() on the futures
 * obtained from the returned UpdateFeaturesResult. For example,
 * if a feature update was in progress already, the controller
 * could return a suitable error.
 *
 * @param updates   set of feature updates, keyed by the
 *                  name of the feature
 * @return          the result of the updateFeatures request
 */
UpdateFeaturesResult updateFeatures(Set<FeatureUpdate> updates);

// ---- END: Proposed Admin API definitions ----

// Represents a cluster-wide finalized feature, with a feature version.
class FinalizedFeature {
	// The name of the feature.
	String name();

    // The cluster-wide finalized value of the feature version level.
    long versionLevel();
}

// Represents a feature that is supported by a broker, with a specific
// feature version range [minVersion, maxVersion].
class SupportedFeature {
	// The name of the feature.
	String name();

	// The minimum version (value >= 1) of the supported feature.
	long minVersion();

	// The maximum version (value >=1 and value >= minVersion) of the supported feature.
	long maxVersion();
}

// Represents an update to a Feature, which can be sent to the controller
// for processing.
class FeatureUpdate {
    // Return the feature to be updated.
	// The version returned via 'getFeature().getVersionLevel()':
    // - When >= 1, isit's the new value to-be-updated for the finalized feature.
	// - When < 1, it indicates the deletion of a finalized feature.
    FinalizedFeature feature();
    
    // Return true only if downgrade/deletion of a feature should be allowed.
    bool allowDowngrade();
}

// Represents a collection of feature metadata, along with the host:port
// of the broker serving the metadata.
class FeatureMetadata {
    // The set of cluster-wide finalized features, keyed by feature name.
	Set<FinalizedFeature> finalizedFeatures();

    // The monotonically increasing epoch for the finalized features.
    long epoch();

	// The set of features supported by a broker, keyed by feature name.
    Set<SupportedFeature> supportedFeatures();

	// The hostname of the broker.
	String host();

	// The port of the broker.
	int32 port();   
}

class DescribeFeaturesResult {
    /**
     * The data returned in the future contains the latest entire set of
     * finalized cluster-wide features, as well as the entire set of 
     * features supported by the broker serving this read request.
     */
    KafkaFuture<FeatureMetadata> all();
}

class UpdateFeaturesResult {
    /**
     * Returns a future which indicates success/failure.
     * 1. If the future has succeeded (i.e. no exceptions),
     *    then the request was 100% successful, and no top level or
     *    individual FeatureUpdate errors were seen. The data
     *    returned in the future contains the latest entire set of
     *    finalized cluster-wide features (after all updates were applied),
     *    as well as the entire set of features supported by the controller
     *.   serving this write request.
     * 2. If the future has failed, the top level error (if any)
     *    or the error from the FeatureUpdate(s) that failed
     *    (if any) is raised to the caller.
     */
    KafkaFuture<FeatureMetadata> all();
}

...