Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Replace ZkMigrationHeartbeat with new version of BrokerRegistration

...

  • Enable forwarding on all brokers (KIP-590: Redirect Zookeeper Mutation Protocols to The Controller)
  • Usage of new ZkMigrationHeartbeat requestBrokerRegistration RPC version
  • Usage of new controller RPC versions
  • Usage of new ApiVersions RPC version (by KRaft controller only)
  • Usage of new ZkMigrationRecord
  • Enable the migration components on KRaft controller and special migration behavior on ZK brokers

All brokers must be running at least this MetadataVersion before the migration can begin. ZK brokers will specify their MetadataVersion using the inter.broker.protocol.version as usual. The KRaft controller will bootstrap with the same MetadataVersion (which is stored in the metadata log as a feature flag – see KIP-778: KRaft to KRaft Upgrades).

...

The possible values for ZkMigrationState are: Started (0) and Finalized (1). A int8 type is used to give the possibility of additional states in the future.

...

Broker Registration RPC

A new version of the broker registration RPC will be introduced that is used by added to support ZK brokers to periodically contact registering with the KRaft quorum. A new tagged field is added to signify the "inter.broker.protocol.version" that a ZK broker was configured with. The presence of this request indicates that a given broker has correctly configured its connection to the quorum. The contents of this request indicate the readiness of this broker's configuration regarding a migration. By examining incoming ZkMigrationHeartbeat requests, the KRaft controller can determine if the migration is able to begin based on the state of the ZK brokersfield is used to indicate that the sending broker is a ZK broker. The usage of this RPC by a ZK broker indicates that it has "zookeeper.metadata.migration.enable" and quorum connection configs properly set.


Code Block
{
  "apiKey": <NEXT KEY>62,
  "type": "request",
  "listeners": ["controller"],
   
  "name": "ZkMigrationHeartbeatBrokerRegistrationRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterIdBrokerId", "type": "uuidint32", "versions": "0+", "entityType": "brokerId",
      "about": "The Clusterbroker ID according to the requesting broker." },
    { "name": "BrokerIdClusterId", "type": "int32string", "entityType": "brokerId", "versions": "0+",
      "about": "The IDcluster id of the requestingbroker brokerprocess." },
    { "name": "BrokerEpochIncarnationId", "type": "int64uuid", "versions": "0+",
      "about": "The epochincarnation id of the requestingbroker brokerprocess." },
      { "name": "IsReadyListeners", "type": "bool[]Listener",
  "versions": "0+",
      "about": "TrueThe listeners ifof this broker has the migration enable config set."},
", "versions": "0+", "fields": [
      { "name": "InterBrokerProtocolVersionName", "type": "string", "versions": "0+", "mapKey": true,
        "nameabout": "The IBPname currently in use by of the requesting brokerendpoint." }
  ]
}

...

,
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]
    },
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." },
--> { "name": "InterBrokerProtocolVersion", "type": "string", "versions": "1+", "taggedVersions": "1+", "tag": 1, "ignorable": true,
      "about": "The static IBP that the broker was started with. This is only used by ZK brokers during a migration."}  <--- new field
  ]
}


Migration State ZNode

As part of the propagation of KRaft metadata back to ZooKeeper while in dual-write mode, we need to keep track of what has been synchronized. A new ZNode will be introduced to keep track of which KRaft record offset has been written back to ZK. This will be used to recover the synchronization state following a KRaft controller failover. 

...

The two controller ZNodes "/controller" and "/controller_epoch" will be managed by the KRaft quorum during the migration. More details in "Controller Leadership" section below_epoch" will be managed by the KRaft quorum during the migration. More details in "Controller Leadership" section below. 

A new version of the JSON schema for "/controller" will be added to include a "isKRaft" boolean field.

Code Block
{
  "version": 2,
  "brokerid": 3000,
  "timestamp": 1234567890,
  "isKRaft": true          <-- new field
}

This field is intended to be informational to aid with debugging.

Operational Changes

Forwarding Enabled on Brokers

...

A new set of nodes will be provisioned to host the controller quorum. These controllers will be started with zookeeper.metadata.migration.enable set to “true”. Once the quorum is established and a leader is elected, the active controller will check that the whole quorum is ready to begin the migration. This is done by examining the new tagged field on ApiVersionsResponse that is exchanged between controllers. Following this, the controller will examine the state of the ZK broker registrations and wait for incoming ZkMigrationHeartbeat BrokerRegistration requests. Once all ZK brokers are have contacted registered with the KRaft controller , (and they are in a valid state, ) the migration process will begin.

...

While running in migration mode, we must synchronize broker registration information from ZK to KRaft. 

The KRaft controller will send UpdateMetadataRequests to ZK brokers to inform them of the other brokers in the cluster. This information is used by the brokers for the replication protocols. Similarly, the KRaft controller must know about ZK and KRaft brokers when performing operations like assignments and leader election.

ZK brokers, KRaft brokers, and the KRaft controller must know about all brokers in the cluster.

mode, the KRaft controller must know about KRaft brokers as well as ZK brokers. The current set of live brokers will be sent to ZK brokers using UpdateMetadataRequest and sent to KRaft brokers using BrokerRegistration[Change]Record in the metadata log. 

A new version of the BrokerRegistration RPC will be used by the ZK brokers to register themselves with KRaft. The usage of this RPC by a ZK broker indicates that it is properly configured for the migration. The new InterBrokerProtocolVersion tagged field in the RPC is used by the KRaft controller to verify that the whole cluster is using the same IBP/MetadataVersion before starting the migration.

After registering, ZK brokers will send BrokerHeartbeat RPCs to indicate liveness. In order to discover which ZK brokers exist, the KRaft controller will need to read the “/brokers” state from ZK and copy it into the metadata log.

If a ZK broker comes online and registers itself with a nodeId of an existing KRaft broker, we will log en error and fence the errant ZK broker by not sending it UpdateMetadataRequests.

...

If a migration has been started, but a KRaft controller is elected that is misconfigured (does not have zookeeper.metadata.migration.enable or ZK configs) this controller should resign. When replaying the metadata log during its initialization phase, this controller can see that a migration is in progress by seeing the initial MigrationRecord. Since it does not have the required configs, it can resign leadership and throw an error.

If a migration has been finalized, but the KRaft quroum comes up with zookeeper.metadata.migration.enable, we must not re-enter the migration mode. In this case, while replaying the log, the controller can see the second MigrationRecord and know that the migration is finalized and should not be resumed. This should result in errors being thrown, but the quorum can continue operating as normal.

Other scenarios likely exist and will be examined as the migration feature is implemented. 

Test Plan

In addition to basic "happy path" tests, we will also want to test that the migration can tolerate failures of brokers and KRaft controllers. We will also want to have tests for the correctness of the system if ZooKeeper becomes unavailable during the migration. Another class of tests for this process is metadata consistency at the broker level. Since we are supporting ZK and KRaft brokers simultaneously, we need to ensure their metadata does not stay inconsistency for very long.

Rejected Alternatives

Offline Migration

that is misconfigured (does not have zookeeper.metadata.migration.enable or ZK configs) this controller should resign. When replaying the metadata log during its initialization phase, this controller can see that a migration is in progress by seeing the initial MigrationRecord. Since it does not have the required configs, it can resign leadership and throw an error.

If a migration has been finalized, but the KRaft quroum comes up with zookeeper.metadata.migration.enable, we must not re-enter the migration mode. In this case, while replaying the log, the controller can see the second MigrationRecord and know that the migration is finalized and should not be resumed. This should result in errors being thrown, but the quorum can continue operating as normal.

Other scenarios likely exist and will be examined as the migration feature is implemented. 

Test Plan

In addition to basic "happy path" tests, we will also want to test that the migration can tolerate failures of brokers and KRaft controllers. We will also want to have tests for the correctness of the system if ZooKeeper becomes unavailable during the migration. Another class of tests for this process is metadata consistency at the broker level. Since we are supporting ZK and KRaft brokers simultaneously, we need to ensure their metadata does not stay inconsistency for very long.

Rejected Alternatives

Offline Migration

The main alternative to this design is to do an offline migration. While this would be much simpler, it would be a non-starter for many Kafka users who require minimal downtime of their cluster. By allowing for an online migration from ZK to KRaft, we can provide a path towards KRaft for all Kafka users – even ones where Kafka is critical infrastructure. 

Online Broker Migration

Once KRaft has taken over leadership of the controller and migrated the ZK data, the design calls for a restart of the ZK brokers into KRaft mode. An alternative to this is to dynamically switch the brokers from using controller RPCs (UpdateMetadata and LeaderAndISR) to the metadata log. This would alleviate the need for a rolling restart of the brokers to bring them into KRaft mode. The difficulty with this approach is that there is a vast difference in the implementations between KafkaServer (ZK) and BrokerServer (KRaft). It is possible to reconcile these differences, but the effort would be very large. This option would also increase the risk of the migration since we would be modifying the "safe" state of the broker code. By leaving the ZK implementation mostly unchanged, we give ourselves a safety net for rolling back during the migration.The main alternative to this design is to do an offline migration. While this would be much simpler, it would be a non-starter for many Kafka users who require minimal downtime of their cluster. By allowing for an online migration from ZK to KRaft, we can provide a path towards KRaft for all Kafka users – even ones where Kafka is critical infrastructure. 

No Dual Writes

Another simplifying alternative would be to only write metadata into KRaft while in the migration mode. This has a few disadvantages. Primarily, it makes rolling back to ZK much more difficult, it at all possible. Secondly, we actually have a few remaining ZK read usages on the brokers that need the data in ZK to be up-to-date (see above section on Dual Metadata Writes). 

...