You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »


Status

Current stateUnder Discussion

Discussion thread: https://lists.apache.org/thread.html/rf8947e0d6aad51023b378305acc285c69030988abc7bda9b9c429b8a%40%3Cdev.kafka.apache.org%3E

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to facilitate rolling upgrades of Kafka in KRaft mode, we need the ability to upgrade controllers and brokers while holding back the use of new RPCs and record formats until the whole cluster has been upgraded. We would also like to perform this upgrade without the need for the infamous “double roll”. 

Additionally, we want the ability to perform downgrades in a controlled and predictable manner.

Background

IBP

Kafka clusters currently use the IBP (inter.broker.protocol.version) configuration to gate new features and RPC changes. Since this is a static configuration defined for each broker in its properties file, a broker must be restarted in order to update this value. This leads to the necessity of two rolling restarts for cluster upgrades:

  • Restart each broker with latest binary
  • Restart each broker with new IBP

Support for downgrading the IBP is also poorly defined. For versions prior to Kafka 3.0, downgrading the IBP is explicitly not supported (based on the documentation). There have been cases where a persisted format was gated by the IBP and downgrades are impossible. However, in practice, we see that for many versions it can often be done without any problems. 

ApiVersions

Brokers and Controllers advertise their API capabilities using ApiVersions RPC. In some cases, the advertised versions of a node's RPCs are influenced by broker’s configured record version or IBP (inter.broker.protocol.version). However, in most cases if an RPC version is defined in the binary of a particular node, it will be advertised through ApiVersions.

One exception is when a broker is advertising controller-forwarded APIs (KIP-590). In this case, a broker connects to the active controller to learn its ApiVersions. For forwarded APIs, the broker and controller’s ApiVersions are intersected to determine maximal safe set of APIs to advertise.

ApiVersionResponse includes:

  • ApiKeys and the min/max version supported by the broker
  • Supported features and the min/max versions for the broker
  • Finalized (cluster-wide) features and the min/max version for the cluster

Feature Flags (KIP-584)

Supported feature flags depend on the version of the code and represent the capabilities of the current binary. A broker defines a minimum and maximum supported version for each feature flag. Finalized feature flags are dynamic and set by an operator using the “kafka-features.sh” script. The operator defines a maximum finalized version for the given feature flag which is used to convey the “in-use” version of the feature within the cluster. To date, no features in Kafka are utilizing these feature flags.

KRaft Snapshots

When a KRaft client joins the quorum and begins fetching, the leader may determine that it needs to load a snapshot to catch up more quickly. This snapshot includes the entire metadata state at a given point in time and will effectively force a broker or controller to re-build it's internal data structures which derive from the metadata log. For example, on the broker, processing a snapshot will result in a new MetadataImage which is the backing data structure for MetadataCache. 

Public Interfaces

Define new KIP-584 feature flag "metadata.version". 

Introduce an IBP version to indicate the lowest software version that supports metadata.version. Below this IBP, the metadata.version is undefined and will not be examined. At or above this IBP, the metadata.version must be 0 for ZooKeeper clusters and will be initialized as 1 for KRaft clusters.


Replace "AllowDowngrade" with "DowngradeType" in UpdateFeaturesRequest. Also add new "DryRun" field to correspond with the existing --dry-run  flag.

{
  "apiKey": 57,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"], <-- New listener "controller" for KRaft
  "name": "UpdateFeaturesRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." },
    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
      "about": "The list of updates to finalized features.", "fields": [
      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the finalized feature to be updated."},
      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
        "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
      ------------------ Remove Field -------------
      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
        "about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}
      ------------------ New Field ----------------
      {"name": "DowngradeType", "type": "int8", "versions": "0+", "default": 0,
        "about": "The type of downgrade to perform. Three types are supported: 0 is NONE (no downgrade will be performed), 1 is SAFE, and 2 is UNSAFE. The safety of a downgrade is determined by the controller and is specific to each feature flag."}
      ------------------ End New Field ------------
    ]},
    ------------------ New Field ------------------
    {"name": "DryRun", "type": "bool", "versions": "1+", "default": false},
    ------------------ End New Field --------------
  ]
}


One new code for UpdateFeaturesResponse results ErrorCode

  • UNSAFE_FEATURE_DOWNGRADE: indicates that a requested feature level downgrade cannot safely be performed  


KIP-584 Addendum

Re-structure the kafka-features.sh tool to provide functions described by the "Basic" and "Advanced" CLI usages as sub-commands. 


usage: kafka-features [-h] {describe, upgrade, downgrade, delete}

Options
-------
-h, --help             Show this help message and exit

Commands
--------
describe               Describe one or more feature flags
upgrade                Upgrade one or more feature flags to the given version(s)
downgrade              Downgrade one or more feature flags to the given version(s)
delete                 Delete (unset) one or more feature flags

All sub-commands require the --bootstrap-server argument and may take the --command-config argument. All operations that might mutate the state of a feature flag also include a --dry-run option. 

For all operations, multiple occurrences of the --feature (and where applicable --version) can be given to perform the operation on a set of features at once.


./kafka-features.sh describe [--feature FEATURE] [--release RELEASE]

Send an ApiVersionsRequest to the controller to learn about the range of supported versions and the current finalized version of every feature flag. If an optional --feature argument is given, only the details of that particular flag are returned. Otherwise, return the details of all the flags. If RELEASE is given, return the details of the features and their default versions for that release.

./kafka-features.sh upgrade [--release RELEASE] [--feature FEATURE --version VERSION] [--dry-run]

If FEATURE and VERSION are given, send an UpdateFeaturesRequest to the controller for the given feature FEATURE to upgrade its version to VERSION. If RELEASE is given, send an UpdateFeaturesRequest for all of the features with the default versions from the given release.

./kafka-features.sh downgrade --feature FEATURE --version VERSION [--unsafe, --dry-run]

Send an UpdateFeaturesRequest to the controller for the given feature FEATURE to downgrade its version to VERSION. Only apply this downgrade if it is considered “safe” by the controller, or the --unsafe flag is given. Details on unsafe downgrades are detailed in the "Downgrades" section below.

./kafka-features.sh delete --feature FEATURE [--unsafe, --dry-run]

Send an UpdateFeaturesRequest to the controller to delete the given feature FEATURE.


New kafka-storage argument

Add --metadata-version option to "format" sub-command of kafka-storage.sh

usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID [--metadata-version VERSION] [--ignore-formatted]

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.
  --cluster-id CLUSTER_ID, -t CLUSTER_ID
                         The cluster ID to use.
  --metadata-version VERSION
                         The initial value for metadata.version feature flag.
  --ignore-formatted, -g
 


AdminClient changes

To support the three possible downgrade types, we will add an enum and a new constructor to org.apache.kafka.clients.admin.FeatureUpdate 

public class FeatureUpdate {
    private final short maxVersionLevel;
	private final DowngradeType downgradeType;

	@Deprecated  // Keep this constructor for backwards compatibility
    public FeatureUpdate(final short maxVersionLevel, final boolean allowDowngrade) {
        this(maxVersionLevel, DowngradeType.SAFE);
    }

    public FeatureUpdate(final short maxVersionLevel, final DowngradeType downgradeType) {
        this.maxVersionLevel = maxVersionLevel;
        this.downgradeType = downgradeType;
    } 

    public short maxVersionLevel() {
        return maxVersionLevel;
    }

    @Deprecated
    public boolean allowDowngrade() {
        return downgradeType == DowngradeType.SAFE;
    }

    public DowngradeType downgradeType() {
		return downgradeType;
    }

	public enum DowngradeType {
		NONE, SAFE, UNSAFE;
    }
}


We will also add a DryRun boolean to UpdateFeaturesOptions with a default no-arg constructor setting the boolean to false.


Proposed Changes

Overview

The sections below go into more detail, but the overall workflow of an upgrade is:

  • Operator performs rolling restart of cluster with a new software version
  • Operator increases metadata.version feature flag using kafka-features.sh tool
    • UpdateFeaturesRequest is sent to the active controller
    • The controller validates that the cluster can be upgraded to this version
    • FeatureLevelRecord is written to the metadata log
    • Components reload their state with new version

The downgrade workflow is similar:

  • Operator decreases metadata.version feature flag using kafka-features.sh tool
    • UpdateFeaturesRequest is sent to the active controller
    • The controller validates that the cluster can be safely downgraded to this version (override with --force)
    • FeatureLevelRecord is written to the metadata log
    • Controller generates new snapshot and components reload their state with it (this snapshot may be lossy!)
    • Broker replicates FeatureLevelRecord for downgrade
    • Broker generates new snapshot and components reload their state with it (this snapshot may be lossy!)
  • Operator performs rolling restart of cluster with downgraded software version

New Feature Flag

We will introduce a new feature flag named metadata.version which takes over and expands on the role of inter.broker.protocol.version. This new feature flag will track changes to the metadata record format and RPCs. Whenever a new record or RPC is introduced, or an incompatible change is made to an existing record or RPC, we will increase this version. The metadata.version is free to increase many times between Kafka releases. This is similar to the IV (inter-version) versions of the IBP.

The metadata.version feature flag will be defined and configured using the facilities introduced by KIP-584 (feature versions). As brokers and controllers upgrade to new software, their maximum supported metadata.version will increase automatically. However, the “finalized” version that can be used by the cluster will only be increased by an operator once all the nodes have upgraded. In other words, the basic workflow of an upgrade is:

  • Rolling upgrade software of each node (broker and controller)
  • Online upgrade of metadata.version to the desired supported version

In the absence of an operator defined value for metadata.version, we cannot safely assume anything about which metadata.version to use. If we simply assumed the highest supported value, it could lead to unintended downgrades in the event that a broker with a lower supported version joined the cluster. To avoid this, and other upgrade complications, we will need to bootstrap metadata.version with some initial version.

Initialization

When the quorum leader is starting up for the first time after this feature flag has been introduced, it will need a way to initialize the finalized version. After the leader finishes loading its state from disk, if has not encountered a FeatureLevelRecord, it will read an initial value for this feature from its local meta.properties file and generate a FeatureLevelRecord. We will extend the format sub-command of kafka-storage.sh to allow operators to specify which version is initialized. If no value has been specified by the operator, the tool will select the latest known value for that version of the software. 

Compatibility

It is possible that brokers and controllers attempt to join the cluster or quorum, but cannot support the current metadata.version. For brokers, this is already handled by the controller during registration. If a broker attempts to register with the controller, but the controller determines that the broker cannot support the current set of finalized features (which includes metadata.version), it will reject the registration request. For controllers, it is more complicated since we need to allow the quorum to be established in order to allow records to be exchanged and learn about the new metadata.version. A controller running old software will join the quorum and begin replicating the metadata log. If this inactive controller encounters a FeatureLevelRecord for metadata.version that it cannot support, it should terminate.

In the unlikely event that an active controller encounters an unsupported metadata.version, it should resign and terminate. 

If a broker encounters an unsupported metadata.version, it should unregister itself and terminate.

Upgrades

KRaft upgrades are done in two steps with only a single rolling restart of the cluster required. After all the nodes of the cluster are running the new software version, they will continue using the previous version of RPCs and record formats. Only after increasing the metadata.version will these new RPCs and records be used. Since a software upgrade may span across multiple metadata.version versions, it should be possible to perform many online upgrades without restarting any nodes. This provides a mechanism for incrementally increasing metadata.version to try out new features introduced between the initial software version and the upgraded software version.

One major difference with the static IBP-based upgrade is that the metadata.version may be changed arbitrarily at runtime. This means broker and controller components which depend on this version will need to dynamically adjust their state and behavior as the version changes. 

ApiVersions

Now that the RPCs in-use by a broker or controller can change at runtime (due to changing metadata.version), we will need a way to inform a node's remote clients that new RPCs are available. Brokers will be able to observe changes to metadata.version by observing the metadata log, and could then submit a new ApiVersionsRequest to the other Kafka nodes. In order for clients to learn about new broker RPCs, closing the connection would trigger a new ApiVersionsRequest to be sent by the client. We may want to investigate alternative approaches here in a future KIP.

Downgrades

One of the goals of this design is to provide a clear path for metadata.version downgrades and software downgrades. Since metadata.version can introduce backwards incompatible formats of persisted data, we can classify downgrades into lossless and lossy. If the target downgrade version is fully compatible with the starting version, the downgrade can be executed without any loss of metadata. However, if a new metadata record has been introduced, or an incompatible change was made to a record, a downgrade is only possible if some metadata is removed from the log.

In order to determine if a downgrade can be lossless, developers must indicate if a newly introduced metadata.version is backwards compatible or not. This is an implementation detail, but it might look something like:

enum MetadataVersions {
  V1(version=1, isBackwardsCompatible=true, description="initial version"),
  V2(version=2, isBackwardsCompatible=true, description="Adding new RPC X"),
  V3(version=3, isBackwardsCompatible=true, description="Adding new optional field to Foo record"),
  V4(version=4, isBackwardsCompatible=false, description="New metadata record type Bar"),
  V5(version=5, isBackwardsCompatible=true, description="New optional field on Bar record")
}

In this example, a downgrade from version 5 to 4 would be lossless as would a downgrade from version 3 to any previous version. Only the downgrade from version >=4 to <=3 would be lossy.

When performing a lossless downgrade, no modifications are made to the metadata records. A snapshot is generated which includes the FeatureLevelRecord that downgrades the metadata.version. Metadata consumers, including brokers and controllers, will read records at the new (lower) metadata.version level. Since a lossless downgrade requires fully backwards compatible changes, the only differences in the record formats can be tagged fields.

If a new record type, or new required fields are added, a lossy downgrade is required. In this case, the snapshot generated by controllers and brokers will exclude new record types and will write metadata records at the downgraded version (thereby excluding new fields). By default, the controller should not perform this kind of downgrade since crucial metadata may be lost. The addition of --unsafe  in the kafka-features.sh tool (and the corresponding DowngradeType  field value of "2" in UpdateFeaturesRequest) is meant to override this behavior. Using the example above, a lossy downgrade from version 5 to version 2 would mean that "Bar" records would be omitted from the snapshot.

Once the downgrade snapshot has been loaded by all nodes, a software downgrade is now possible. In both lossy and lossless downgrade scenarios, there may be tagged fields present in the metadata records from previous newer version, but these are transparently skipped over during record deserialization.  


Compatibility, Deprecation, and Migration Plan

For clusters running self-managed mode, there will be one final version of inter.broker.protocol.version. Once upgraded to this version, components will begin using metadata.version as the gatekeeper for new features, RPCs, and metadata records. The new version will be managed using Kafka’s feature flag capabilities. The final IBP version will also gate the protocol changes detailed above.

For clusters in ZooKeeper mode, there may be additional increases of inter.broker.protocol.version to introduce new RPCs. While Zookeeper is still supported, will need to take care that whenever an IBP (or IV) is added, a metadata.version is also added.

This design assumes a Kafka cluster in self-managed mode as a starting point. A future KIP will detail the procedure for migrating a ZooKeeper managed Kafka to a self-managed Kafka.

Rejected Alternatives

Transitional feature level

This idea utilizes the fact that KIP-584 feature flags can have a minimum and maximum finalized version. We would consider the cluster to be in a "transitional" version if these two were not equal. For example, min finalized version of 1 and max finalized version of 3. While in this transitional state, the cluster would enable new features introduced (up to the maximum version), but keep the metadata log backwards compatible to the minimum version. 

Ultimately, this was determined to be too complex and placed a lot of burden on implementers to figure out various aspects of compatibility. It also increased the burden of knowledge on the operator to understand the implications of this transitional state.

Duplicate records

Similar to the transitional approach above, but in this case the min and max versions of a record would be written to the log together within a generic composite record. For the upgrade scenario, components would only process the newer of the two. For downgrade scenarios, the older (backwards compatible) version would be read. This would allow a downgrade to occur without rewriting the metadata log.

The main downside of this approach is that there is an undetermined amount of time when both versions of a record would be needed in the metadata log. This could lead to increased space requirements for an extended period of time. 




  • No labels