Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Renamings and clarifications from discussion threads

...

MBean nameDescription
kafka.server:type=KafkaServer,name=MetadataType

An enumeration of: ZooKeeper (1) or KRaft (2). Each broker reports this.

kafka.controller:type=KafkaController,name=MetadataTypeAn enumeration of: ZooKeeper (1), KRaft (2), or Dual (3). The active controller reports this.
kafka.controller:type=KafkaController,name=Features,feature={feature},level={level}The finalized set of features with their level as seen by the controller. Used to help operators see the cluster's current metadata.version
kafka.controller:type=KafkaController,name=MigrationStateZkMigrationStateAn enumeration of the possible migration states the cluster can be in. This is only reported by the active controller. 
kafka.controller:type=KafkaController,name=MigrationIneligibleBrokerCountA count of ZK brokers that are not eligible for migration. This metric will only be reported by the active KRaft controller while in the "MigrationInelgible" MigrationState ZkMigrationState. If not in that state, it will report zero.
kafka.controller:type=KafkaController,name=MigrationIneligibleControllerCountA count of KRaft quorum controllers that are not eligible for migration. This metric will only be reported by the active KRaft controller while in the "MigrationInelgible" MigrationState ZkMigrationState. If not in that state, it will report zero.
kafka.controller:type=KafkaController,name=ZooKeeperWriteBehindLagThe amount of lag in records that ZooKeeper is behind relative to the highest committed record in the metadata log. This metric will only be reported by the active KRaft controller.
kafka.controller:type=KafkaController,name=ZooKeeperBlockingKRaftMillisThe number of milliseconds a write to KRaft has been blocked due to lagging ZooKeeper writes. This metric will only be reported by the active KRaft controller.

...

All brokers must be running at least this MetadataVersion before the migration can begin. ZK brokers will specify their MetadataVersion using the inter.broker.protocol.version as usual. The KRaft controller will bootstrap with the same MetadataVersion (which is stored in the metadata log as a feature flag – see KIP-778: KRaft to KRaft Upgrades).

Configuration

A new “zookeeper.metadata.migration.enable” config will be added for the ZK broker and KRaft controller. Its default will be “false”. Setting this config to “true” on each broker is a prerequisite to starting the migration. Setting this to "true" on the KRaft controllers is the trigger for starting the migration (more on that below). Setting this to "true" (or "false") on a KRaft broker has no affect.

...

Code Block
{
  "apiKey": 18,
  "type": "response",
  "name": "ApiVersionsResponse",
  "validVersions": "0-4",   // <-- New version 4
  "flexibleVersions": "3+",
  "fields": [
    ...
    { "name": "ZkMigrationReady", "type": "int8", "versions": "4+", "taggedVersions": "4+", "tag": 3, "ignorable": true,
      "about": "Set by a KRaft controller if the required configurations for ZK migration are present" } the required configurations for ZK migration are present" }
  ]
}

This field will only be set by the KRaft controller when sending ApiVersionsResponse to other KRaft controllers. Since this migration does not support combined mode KRaft nodes, this field will never be seen by clients when receiving ApiVersionsResponse sent by brokers.

LeaderAndIsrRequest

A new RPC version will be added which adds the field KRaftControllerId. This field will point to the active KRaft controller. If this field is set, the ControllerId field should be -1.

Code Block
{
  "apiKey": 4,
  "type": "request",
  "listeners": ["zkBroker"],
  "name": "LeaderAndIsrRequest",
  "validVersions": "0-7", 	// <-- New version 7
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The current controller ID." },
--> { "name": "KRaftControllerId", "type": "int32", "versions": "8+", "entityType": "brokerId",
      "about": "The KRaft controller id, used during migration." }, <-- New field
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The current controller epoch." },
    ...
  ]
}


Migration Metadata Record

...

Code Block
{
  "apiKey": <NEXT KEY>,
  "type": "metadata",
  "name": "MigrationRecordZkMigrationRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "MigrationStateZkMigrationState", "type": "int8", "versions": "0+",
      "about": "One of the possible migration states." },
  ]
}

The possible values for MigrationState ZkMigrationState are: Started (0) and Finalized (1). A int8 type is used to give the possibility of additional states in the future.

...

Code Block
ZNode /migration

{
  "version": 0,
  "last_update_time_ms": "2022-01-01T00:00:00.000Z",
  "kraft_controller_id": 3000,
  "kraft_controller_epoch": 1,
  "kraft_metadata_offset": 1234,
  "kraft_metadata_epoch": 10
}

...

  1. Brokers have inter.broker.protocol.version set to the version added by this KIP to enable forwarding and indicate they are at the minimum software version
  2. Brokers have kafkazookeeper.metadata.migration.enable set to “true”. This indicates an operator has declared some intention to start the migration.
  3. Brokers have the configs in "Additional ZK Broker Configs" set. This allows them to connect to the KRaft controller.
  4. No brokers are offline (we will use offline replicas as a proxy for this).
  5. The KRaft quorum is online and all members have kafkazookeeper.metadata.migration.enable set to "true" as well as ZK configs set.

...

Here is a state machine description of the migration. There will likely be more internal states that the controller uses, but these four will be exposed as the MigrationState metricZkMigrationState metric.


State

Enum

Description

None0This cluster started out as KRaft and was not migrated.

MigrationIneligible

1

The brokers and controllers do not meet the migration criteria. The cluster is operating in ZooKeeper mode.

MigratingZkData

2

The controller is copying data from ZooKeeper into KRaft.

DualWriteMetadata

3

The controller is in KRaft mode making dual writes to ZooKeeper.

MigrationFinalized

4

The cluster has been migrated to KRaft mode.

...

A new set of nodes will be provisioned to host the controller quorum. These controllers will be started with kafkazookeeper.metadata.migration.enable set to “true”. Once the quorum is established and a leader is elected, the active controller will check that the whole quorum is ready to begin the migration. This is done by examining the new tagged field on ApiVersionsResponse that is exchanged between controllers. Following this, the controller will examine the broker registrations in ZK. If all ZK brokers are ready for migration, the migration process will begin.

...

Once the operator has decided to commit to KRaft mode, the final step is to restart the controller quorum and take it out of migration mode by setting kafkazookeeper.metadata.migration.enable to "false" (or unsetting it). The active controller will only finalize the migration once it detects that all members of the quorum have signaled that they are finalizing the migration (again, using the tagged field in ApiVersionsResponse). Once the controller leaves migration mode, it will write a MigrationRecord to the log and no longer perform writes to ZK. It will also disable its special handling of ZK RPCs.

...

If a migration has been started, but a KRaft controller is elected that is misconfigured (does not have kafkazookeeper.metadata.migration.enable or ZK configs) this controller should resign. When replaying the metadata log during its initialization phase, this controller can see that a migration is in progress by seeing the initial MigrationRecord. Since it does not have the required configs, it can resign leadership and throw an error.

If a migration has been finalized, but the KRaft quroum comes up with kafkazookeeper.metadata.migration.enable, we must not re-enter the migration mode. In this case, while replaying the log, the controller can see the second MigrationRecord and know that the migration is finalized and should not be resumed. This should result in errors being thrown, but the quorum can continue operating as normal.

...

An alternative to write-behind for ZooKeeper would be to write first to ZooKeeper and then write to the metadata log. The main problem with this approach is that it will make KRaft writes much slower since ZK will always be in the write path. By doing a write-behind with offset tracking, we can amortize the ZK write latency and possibly be more efficient about making bulk writes to ZK.