Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Remove ZK registration JSON change, add ZkMigrationHeartbeat RPC

...

All brokers must be running at least this MetadataVersion before the migration can begin. ZK brokers will specify their MetadataVersion using the inter.broker.protocol.version as usual. The KRaft controller will bootstrap with the same MetadataVersion (which is stored in the metadata log as a feature flag – see KIP-778: KRaft to KRaft Upgrades).

...

A new “zookeeper.metadata.migration.enable” config will be added for the ZK broker and KRaft controller. Its default will be “false”. Setting this config to “true” on each broker is a prerequisite to starting the migration. Setting this to "true" on the KRaft controllers is the trigger for starting the migration (more on that below). Setting this to "true" (or "false") on a KRaft broker has no affect.

ZK Broker Registration JSON

Controller RPCs

For the three ZK controller RPCs UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest a new KRaftControllerId field In order to inform the KRaft controller that a ZK broker is ready for migration, a new version of the broker registration JSON will be added. This new version (6) will add a kraftMigration object. The object will include properties needed by the KRaft controller to begin the migration. The usage of this new version (and field) will be gated on the MetadataVersion introduced by this KIP.field will point to the active KRaft controller and will only be set when the controller is in KRaft mode. If this field is set, the ControllerId field should be -1. 

Code Block
{
  "versionapiKey": 64,
  "hosttype": "broker01request",
  "portlisteners": 9092["zkBroker"],
  "jmx_portname": 9999"LeaderAndIsrRequest",
  "timestampvalidVersions": 2233345666,
  "endpoints": [],"0-7",  <-- New version 7
  "rackflexibleVersions": "4+",
  "featuresfields": {},[
    { "kraftMigrationname": { // <-- New object
    "isReady": true"ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "clusterIdabout": "uKMoqJEZRSWt0uDX44O5Wg"The controller id." },
--> { "name": "KRaftControllerId", "ibptype": "3.4-IV0"
  }  
}

In addition to checking if a broker is ready for migration, these new properties are used by the KRaft controller to verify that the brokers and new KRaft controllers have valid configurations. 

Controller RPCs

For the three ZK controller RPCs UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest a new KRaftControllerId field will be added. This field will point to the active KRaft controller and will only be set when the controller is in KRaft mode. If this field is set, the ControllerId field should be -1. 

Code Block
{
  "apiKey": 4,
 int32", "versions": "7+", "entityType": "brokerId",
      "about": "The KRaft controller id, used during migration." }, <-- New field
    { "name": "ControllerEpoch", "type": "requestint32",
  "listenersversions": ["zkBroker0+"],
      "nameabout": "LeaderAndIsrRequest"The controller epoch." },
  "validVersions":  ...
   ]
}


Code Block
{"0-7",  <-- New version 7
  "flexibleVersionsapiKey": "4+"5,
  "fieldstype": ["request",
    { "namelisteners": ["ControllerIdzkBroker"],
  "typename": "int32StopReplicaRequest",
  "versionsvalidVersions": "0-4",  <-- New version 4
  "flexibleVersions": "2+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The controller id." },
--> { "name": "KRaftControllerId", "type": "int32", "versions": "74+", "entityType": "brokerId",
      "about": "The KRaft controller id, used during migration." }, <-- New field
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The controller epoch." },
    ...
   ]
}


Code Block
{
  "apiKey": 56,
  "type": "request",
  "listeners": ["zkBroker"],
  "name": "StopReplicaRequestUpdateMetadataRequest",
  "validVersions": "0-48",  <-- New version 48
  "flexibleVersions": "26+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The controller id." },
--> { "name": "KRaftControllerId", "type": "int32", "versions": "48+", "entityType": "brokerId",
      "about": "The KRaft controller id, used during migration." }, <-- New field
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The controller epoch." },
    ...
   ]
}

ApiVersionsResponse

A new tagged field on ApiVersionsResponse will be added to allow KRaft controllers to indicate their ability to perform the migration

Code Block
{
  "apiKey": 618,
  "type": "requestresponse",
  "listeners": ["zkBroker"],
  "name": "UpdateMetadataRequestApiVersionsResponse",
  "validVersions": "0-84",   // <-- New version 84
  "flexibleVersions": "63+",
  "fields": [
    ...
    { "name": "ControllerIdZkMigrationReady", "type": "int32int8", "versions": "04+", "entityTypetaggedVersions": "brokerId4+",
      "abouttag": "The controller id." },
--> {3, "nameignorable": "KRaftControllerId"true,
  "type": "int32", "versions": "8+", "entityTypeabout": "brokerId",
Set by a KRaft controller if "about": "The KRaft controller id, used during migration." }, <-- New field
    { "name": "ControllerEpoch", "type": "int32", "versionsthe required configurations for ZK migration are present" }
  ]
}

This field will only be set by the KRaft controller when sending ApiVersionsResponse to other KRaft controllers. Since this migration does not support combined mode KRaft nodes, this field will never be seen by clients when receiving ApiVersionsResponse sent by brokers.

Migration Metadata Record

A new metadata record is added to indicate if a ZK migration has been started or finalized. 

Code Block
{
  "apiKey": <NEXT KEY>,
  "type": "metadata",
  "name": "ZkMigrationRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "aboutname": "The controller epoch." }ZkMigrationState", "type": "int8", "versions": "0+",
    ...
   ]
}

ApiVersionsResponse

A new tagged field on ApiVersionsResponse will be added to allow KRaft controllers to indicate their ability to perform the migration

"about": "One of the possible migration states." },
  ]
}

The possible values for ZkMigrationState are: Started (0) and Finalized (1). A int8 type is used to give the possibility of additional states in the future.

Migration Heartbeat RPC

A new RPC will be introduced that is used by ZK brokers to periodically contact the KRaft quorum. The presence of this request indicates that a given broker has correctly configured its connection to the quorum. The contents of this request indicate the readiness of this broker's configuration regarding a migration. By examining incoming ZkMigrationHeartbeat requests, the KRaft controller can determine if the migration is able to begin based on the state of the ZK brokers.

Code Block
{
  "apiKey": <NEXT KEY>,
  "type": "request",
  "listeners": ["controller"],   
  "name": "ZkMigrationHeartbeat",
  "validVersions": "0",
Code Block
{
  "apiKey": 18,
  "type": "response",
  "name": "ApiVersionsResponse",
  "validVersions": "0-4",   // <-- New version 4
  "flexibleVersions": "30+",
  "fields": [
    { "name": "ClusterId", "type": "uuid", "versions": "0+",
      "about": "The Cluster ID according to the requesting broker..."}
    { "name": "ZkMigrationReadyBrokerId", "type": "int8int32", "versionsentityType": "4+brokerId", "taggedVersionsversions": "40+", "tag": 3, "ignorable": true,
      "about": "SetThe byID a KRaft controller if the required configurations for ZK migration are present" }
  ]
}

This field will only be set by the KRaft controller when sending ApiVersionsResponse to other KRaft controllers. Since this migration does not support combined mode KRaft nodes, this field will never be seen by clients when receiving ApiVersionsResponse sent by brokers.

Migration Metadata Record

A new metadata record is added to indicate if a ZK migration has been started or finalized. 

Code Block
{
  "apiKey": <NEXT KEY>,
  "type": "metadata",
 of the requesting broker."},
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the requesting broker."},
    { "name": "ZkMigrationRecordIsReady",
  "type": "bool", "validVersionsversions": "0+",
      "flexibleVersionsabout": "0+"True if this broker has the migration enable config set."},
  "fields": [
    { "name": "ZkMigrationStateInterBrokerProtocolVersion", "type": "int8string", "versions": "0+",
      "aboutname": "One ofThe IBP currently in use by the possiblerequesting migration statesbroker." },
  ]
}

The possible values for ZkMigrationState are: Started (0) and Finalized (1). A int8 type is used to give the possibility of additional states in the futureThis request will be sent periodically by ZK controllers to the KRaft quorum during the migration. This heartbeat will be mainly used for debugging and informational purposes. If a subsequent heartbeat comes with an invalid state after the migration has started, the broker will be fenced off from receiving controller RPCs. See the section on "Misconfigurations" below.

Migration State ZNode

As part of the propagation of KRaft metadata back to ZooKeeper while in dual-write mode, we need to keep track of what has been synchronized. A new ZNode will be introduced to keep track of which KRaft record offset has been written back to ZK. This will be used to recover the synchronization state following a KRaft controller failover. 

...

A new set of nodes will be provisioned to host the controller quorum. These controllers will be started with zookeeper.metadata.migration.enable set to “true”. Once the quorum is established and a leader is elected, the active controller will check that the whole quorum is ready to begin the migration. This is done by examining the new tagged field on ApiVersionsResponse that is exchanged between controllers. Following this, the controller will examine the state of the ZK broker registrations in ZK. If all ZK brokers are ready for migration, the migration process will begin. and wait for incoming ZkMigrationHeartbeat requests. Once all ZK brokers are have contacted the KRaft controller, and they are in a valid state, the migration process will begin.

There is no ordering dependency between configuring ZK brokers for the migration and bringing up the KRaft quorum. 

The first step in the migration is to copy the existing metadata from ZK and write it into the KRaft metadata log. The active controller will also establish itself as the active controller from a ZK perspective. While copying the ZK data, the controller will not handle any RPCs from brokers. 

...

StopReplicas: following reassignments and topic deletions, we will need to send StopReplicas to ZK brokers for them to stop managing certain replicas

Each of these RPCs will include a new KRaftControllerId field that points to the active KRaft controller. When this field is present, it acts as a signal to the brokers that the controller is in KRaft mode. Using this field, and the zookeeper.metadata.migration.enable config, the brokers can enable migration specific behavior

Controller Leadership

In order to prevent further writes to ZK, the first thing the new KRaft quorum must do is take over leadership of the ZK controller. This can be achieved by unconditionally overwriting two values in ZK. The "/controller" ZNode indicates the current active controller. By overwriting it, a watch will fire on all the ZK brokers to inform them of a new controller election. The active KRaft controller will write its node ID (e.g., 3000) into this ZNode to claim controller leadership. This write will be persistent rather than the usual ephemeral write used by the ZK controller election algorithm. This will ensure that no ZK broker can claim leadership during a KRaft controller failover.

...