Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updating metrics to reflect what has been implemented

Table of Contents

Status

Current state: In DiscussionAccepted

Discussion thread: https://lists.apache.org/thread/phnrz31dj0jz44kcjmvzrrmhhsmbx945

...

MigrationIneligibleBrokerCount not eligible for migration. This metric will only be reported by the active KRaft controller while in the "MigrationInelgible" ZkMigrationState. If not in that state, it will report zero.A count of KRaft quorum controllers that are not eligible metric KRaft while in the "MigrationInelgible" ZkMigrationState. If not in that state, it will report zeroZooKeeperBlockingKRaftMillis a write to KRaft has been blocked due to lagging ZooKeeper writes. This metric will only be reported by the active KRaft controller.
MBean nameDescription
kafka.server:type=KafkaServer,name=MetadataType

An enumeration of: ZooKeeper (1) or KRaft (2). Each broker reports this.

kafka.controller:type=KafkaController,name=MetadataTypeAn enumeration of: ZooKeeper (1), KRaft (2), or Dual (3). The active controller reports this.
kafka.controller:type=KafkaController,name=Features,feature={feature},level={level}The finalized set of features with their level as seen by the controller. Used to help operators see the cluster's current metadata.version
kafka.controller:type=KafkaController,name=ZkMigrationStateAn enumeration of the possible migration states the cluster can be in. This is only reported by the active controller. 
kafka.controller:type=KafkaController,name=MigratingZkBrokerCountA count of ZK brokers that are kafka.controller:type=KafkaController,name=MigrationIneligibleControllerCountregistered with KRaft and ready for migration. This will only be reported by the active controller.
kafka.controller:type=KafkaController,name=ZooKeeperWriteBehindLagZkWriteBehindLagThe amount of lag in records that ZooKeeper is behind relative to the highest committed record in the metadata log. This metric will only be reported by the active KRaft controller.
kafka.controller:type=KafkaController,name=ZkWriteSnapshotTimeMsThe number of milliseconds the KRaft controller took reconciling a snapshot into ZK
kafka.controller:type=KafkaController,name=ZkWriteDeltaTimeMsThe number of milliseconds the KRaft controller took writing a delta into ZK

MetadataVersion (IBP)

A new MetadataVersion in the 3.4 line will be added. This version will be used for a few things in this design.

  • Enable forwarding on all brokers (KIP-590: Redirect Zookeeper Mutation Protocols to The Controller)
  • Usage of new BrokerRegistration RPC version
  • Usage of new controller RPC versions
  • Usage of new ApiVersions RPC version (by KRaft controller only)
  • Usage of new ZkMigrationRecordZkMigrationStateRecord
  • Enable the migration components on KRaft controller and special migration behavior on ZK brokers

...

For the three ZK controller RPCs UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest a new KRaftControllerId IsKRaftController field will be added. This field will point is used to indicate that the active KRaft controller and will only be set when the controller is in KRaft mode. If this field is set, the ControllerId field should be -1. controller sending this RPC is a KRaft controller.

Code Block
{
Code Block
{
  "apiKey": 4,
  "type": "request",
  "listeners": ["zkBroker"],
  "name": "LeaderAndIsrRequest",
  "validVersions": "0-7",  // <-- New version 7
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The controller id." },
-->     
    { "name": "KRaftControllerIdisKRaftController", "type": "int32bool", "versions": "7+", "entityTypedefault": "brokerIdfalse",
      "about": "TheIf KRaft controller id, is used during migration. See KIP-866" }, <-- New field
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The controller epoch." },
    ...
   ]
}

...

Code Block
{
  "apiKey": 5,
  "type": "request",
  "listeners": ["zkBroker"],
  "name": "StopReplicaRequest",
  "validVersions": "0-4",  // <-- New version 4
  "flexibleVersions": "2+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The controller id." },
-->     
    { "name": "KRaftControllerIdisKRaftController", "type": "int32bool", "versions": "4+", "entityTypedefault": "brokerIdfalse",
      "about": "TheIf KRaft controller id, is used during migration. See KIP-866" }, // <-- New field
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The controller epoch." },
    ...
   ]
}

...

Code Block
{
  "apiKey": 6,
  "type": "request",
  "listeners": ["zkBroker"],
  "name": "UpdateMetadataRequest",
  "validVersions": "0-8",  // <-- New version 8
  "flexibleVersions": "6+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The controller id." },
-->     
    { "name": "KRaftControllerIdisKRaftController", "type": "int32bool", "versions": "8+", "entityTypedefault": "brokerIdfalse",
      "about": "TheIf KRaft controller id, is used during migration. See KIP-866" }, // <-- New field
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The controller epoch." },
    ...
   ]
}

...


Migration Metadata Record

A new tagged field on ApiVersionsResponse will be added to allow KRaft controllers to indicate their ability to perform the migrationmetadata record is added to indicate if a ZK migration has been started or finalized. 

Code Block
{
  "apiKey": 1821,
  "type": "responsemetadata",
  "name": "ApiVersionsResponseZkMigrationStateRecord",
  "validVersions": "0-4",   // <-- New version 4
  "flexibleVersions": "30+",
  "fields": [
    ...
       { "name": "ZkMigrationReadyZkMigrationState", "type": "int8", "versions": "4+", "taggedVersions": "4+", "tag": 3, "ignorable": true0+",
      "about": "Set by a KRaft controller ifOne of the required configurations for ZK migration are presentpossible migration states." },
  ]
}

This field will only be set by the KRaft controller when sending ApiVersionsResponse to other KRaft controllers. Since this migration does not support combined mode KRaft nodes, this field will never be seen by clients when receiving ApiVersionsResponse sent by brokers.

The initial supported values will be:

  • 0: Not Ready
  • 1: Ready
  • unset: Not a ZK controller

Migration Metadata Record

A new metadata record is added to indicate if a ZK migration has been started or finalized. 

Code Block
{
  "apiKey": <NEXT KEY>,
  "type": "metadata",
  "name": "ZkMigrationRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ZkMigrationState", "type": "int8", "versions": "0+",
      "about": "One of the possible migration states." },
  ]
}

The possible values for ZkMigrationState are: Started (0) and Finalized (1The possible values for ZkMigrationState are: None (0), Pre-Migration (1), Migration (2), and Post-Migration (3). A int8 type is used to give the possibility of additional states in the future.

...

A new version of the broker registration RPC will be added to support ZK brokers registering with the KRaft quorum. A new tagged boolean field is added to signify that indicate that the sender of the RPC is a ZK broker that is ready for migration. The presence of this field is used to indicate that the sending broker is a ZK broker. The usage of this RPC by a ZK broker indicates that it has "zookeeper.metadata.migration.enable" and quorum connection configs properly set. The values of this tagged field are the same as the equivalent field in ApiVersionsRequest.

...

Code Block
{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-1",",
  "validVersions": "0-1", // <-- New version 1
  "flexibleVersions": "0+",
  "fields": [
    // ...     
-->    { "name": "ZkMigrationReadyIsMigratingZkBroker", "type": "int8bool", "versions": "1+", "taggedVersionsdefault": "1+", "tag": 1, "ignorable": truefalse",
      "about": "Set by a ZK broker ifIf the required configurations for ZK migration are present., this value is set to true" } <--- new field   
  ]
}

RegisterBrokerRecord

A new field is added to signify that a registered broker is a ZooKeeper broker.

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord",
  "validVersions": "0-2",  // <-- New version 2
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "0+",
  "fields": [
    ...
    The broker id." },     
    { "name": "ZkMigrationReadyIsMigratingZkBroker", "type": "int8bool", "versions": "2+", "taggedVersionsdefault": "2+", "tag": 1, "ignorable": truefalse",
      "about": "Set byTrue if the broker is a ZK broker if the required configurations for ZK migration are present" } in migration mode. Otherwise, false" },  // <-- New field
    // ...
  ]
}


Migration State ZNode

As part of the propagation of KRaft metadata back to ZooKeeper while in dual-write mode, we need to keep track of what has been synchronized. A new ZNode will be introduced to keep track of which KRaft record offset has been written back to ZK. This will be used to recover the synchronization state following a KRaft controller failover. 

...

A new version of the JSON schema for "/controller" will be added to include a "isKRaftkraftControllerEpoch" boolean field.

Code Block
{
  "version": 2, // <-- New version 2
  "brokerid": 3000,
  "timestamp": 1234567890,
  "isKRaftkraftControllerEpoch": true42       //   <-- newNew field
}

This field is intended to be informational to aid with debugging.

...

Here is a state machine description of the migration. There will likely be more internal states that the controller uses, but these four will be exposed as the ZkMigrationState metric.


not .

State

Enum

Description

None

0This cluster started out as KRaft

The cluster is in KRaft mode and was

never migrated

from ZooKeeper

MigrationIneligiblePreMigration

1

The brokers and controllers do not meet the migration criteria. The cluster is operating in ZooKeeper mode.

MigratingZkData

2

The controller is copying data from ZooKeeper into KRaft.

DualWriteMetadata

3

The controller is in KRaft mode making dual writes to ZooKeeper.

MigrationFinalized

4

The cluster has been migrated to KRaft mode.

A KRaft controller has been provisioned and has migration enabled.

Migration

2

The KRaft controller has begun the data migration, brokers are being restarted, dual-writes are in progress.

PostMigration

3

The cluster is in KRaft mode


The active ZooKeeper controller will not report this metric, only the The active ZooKeeper controller always reports "MigrationIneligible" while the active KRaft controller reports the state corresponding to the state of the migration.

...

A new set of nodes will be provisioned to host the controller quorum. These controllers will be started with zookeeper.metadata.migration.enable set to “true”. Once the quorum is established and a leader is elected, the active controller will check that the whole quorum is ready to begin the migration. This is done by examining the new tagged field on ApiVersionsResponse that is exchanged between controllers. Following this, the controller will examine the state of the ZK broker registrations determine the set of extant ZK brokers and wait for incoming BrokerRegistration requests (see section on ZK Broker Presence). Once all known ZK brokers have registered with the KRaft controller (and they are in a valid state) the migration process will begin.

...

The metadata copied from ZK will be encapsulated in a single metadata transaction (KIP-868). A MigrationRecord ZkMigrationStateRecord will also be included in this transaction. 

...

Once the operator has decided to commit to KRaft mode, the final step is to restart the controller quorum and take it out of migration mode by setting zookeeper.metadata.migration.enable to "false" (or unsetting it). The active controller will only finalize the migration once it detects that all members of the quorum have signaled that they are finalizing the migration (again, using the tagged field in ApiVersionsResponse). Once the controller leaves migration mode, it will write a MigrationRecord ZkMigrationStateRecord to the log and no longer perform writes to ZK. It will also disable its special handling of ZK RPCs.

...

UpdateMetadata: for metadata changes, the KRaft controller will need to send UpdateMetadataRequests to the ZK brokers.

StopReplicas: following reassignments and topic deletions, we will need to send StopReplicas to ZK brokers for them to stop managing certain replicas. 

Each of these RPCs will include a new IsKRaftController field that indicates if the sending controller is a KRaft controller. Using this field, and the zookeeper.metadata.migration.enable config, the brokers . Instead of ControllerId, the KRaft controller will specify itself using KRaftControllerId field.

StopReplicas: following reassignments and topic deletions, we will need to send StopReplicas to ZK brokers for them to stop managing certain replicas. 

Each of these RPCs will include a new KRaftControllerId field that points to the active KRaft controller. When this field is present, it acts as a signal to the brokers that the controller is in KRaft mode. Using this field, and the zookeeper.metadata.migration.enable config, the brokers can enable migration specific behavior. 

Controller Leadership

In order to prevent further writes to ZK, the first thing the new KRaft quorum must do is take over leadership of the ZK controller. This can be achieved by unconditionally overwriting two values in ZK. The "/controller" ZNode indicates the current active controller. By overwriting it, a watch will fire on all the ZK brokers to inform them of a new controller election. The active KRaft controller will write its node ID (e.g., 3000) into this ZNode to claim controller leadership. This write will be persistent rather than the usual ephemeral write used by the ZK controller election algorithm. This will ensure that no ZK broker can claim leadership during a KRaft controller failover.

The second ZNode we will write to is "/controller_epoch". This ZNode is used for fencing writes from old controllers in ZK mode. Each write from a ZK controller is actually a conditional multi-write with a "check" operation on the "/controller_epoch" ZNode's version. By altering this node, we can ensure any in-flight writes from the previous ZK controller epoch will fail.

Every time a KRaft controller election occurs, the newly elected controller will overwrite the values in “/controller” and “/controller_epoch”. The first epoch generated by the KRaft quroum must be greater than the last ZK epoch in order to maintain the monotonic epoch invariant.

Broker Registration

While running in migration mode, the KRaft controller must know about KRaft brokers as well as ZK brokers. This will be accomplished by having the ZK brokers send the broker lifecycle RPCs to the KRaft controller.

A new version of the BrokerRegistration RPC will be used by the ZK brokers to register themselves with KRaft. The ZK brokers will set the new ZkMigrationReady field and populate the Features field with a "metadata.version" min and max supported equal to their IBP. The KRaft controller will only accept the registration if the given "metadata.version" is equal to the IBP/MetadataVersion of the quorum. The controller will also only accept the registration if the ZkMigrationReady has a valid value.

After successfully registering, the ZK brokers will send BrokerHeartbeat RPCs to indicate liveness. The ZK brokers will learn about other brokers in the usual way through UpdateMetadataRequest.

If a ZK broker attempts to register with an invalid node ID, cluster ID, or IBP, the KRaft controller will reject the registration and the broker will terminate.

If a KRaft broker attempts to register itself with the node ID of an existing ZK broker, the controller will reject the registration and the broker will terminate.

KRaft Controller Soft Start

When the KRaft quorum is first established prior to starting a migration, it should not handle most RPCs until the initial data migration from ZooKeeper has completed. This is necessary to prevent divergence of metadata during the initial data migration. The controller will need to process RPCs related to Raft as well as BrokerRegistration and BrokerHeartbeat. Other RPCs (such as CreateTopics) will be rejected with a NOT_CONTROLLER error.

can enable migration specific behavior. 

Controller Leadership

In order to prevent further writes to ZK, the first thing the new KRaft quorum must do is take over leadership of the ZK controller. This can be achieved by unconditionally overwriting two values in ZK. The "/controller" ZNode indicates the current active controller. By overwriting it, a watch will fire on all the ZK brokers to inform them of a new controller election. The active KRaft controller will write its node ID (e.g., 3000) and epoch into this ZNode to claim controller leadership. This write will be persistent rather than the usual ephemeral write used by the ZK controller election algorithm. This will ensure that no ZK broker can claim leadership during a KRaft controller failover.

The second ZNode we will write to is "/controller_epoch". This ZNode is used for fencing writes from old controllers in ZK mode. Each write from a ZK controller is actually a conditional multi-write with a "check" operation on the "/controller_epoch" ZNode's version. By altering this node, we can ensure any in-flight writes from the previous ZK controller epoch will fail.

Every time a KRaft controller election occurs, the newly elected controller will overwrite the values in “/controller” and “/controller_epoch”. The first epoch generated by the KRaft quroum must be greater than the last ZK epoch in order to maintain the monotonic epoch invariant.

Broker Registration

While running in migration mode, the KRaft controller must know about KRaft brokers as well as ZK brokers. This will be accomplished by having the ZK brokers send the broker lifecycle RPCs to the KRaft controller.

A new version of the BrokerRegistration RPC will be used by the ZK brokers to register themselves with KRaft. The ZK brokers will set the new IsMigrationZkBroker field and populate the Features field with a "metadata.version" min and max supported equal to their IBP. The KRaft controller will only accept the registration if the given "metadata.version" is equal to the IBP/MetadataVersion of the quorum. 

After successfully registering, the ZK brokers will send BrokerHeartbeat RPCs to indicate liveness. The ZK brokers will learn about other brokers in the usual way through UpdateMetadataRequest.

If a ZK broker attempts to register with an invalid node ID, cluster ID, or IBP, the KRaft controller will reject the registration and the broker will terminate.

If a KRaft broker attempts to register itself with the node ID of an existing ZK broker, the controller will reject the registration and the broker will terminate.

KRaft Controller Pre-Migration State

When the KRaft quorum is first established prior to starting a migration, it should not handle most RPCs until the initial data migration from ZooKeeper has completed. This is necessary to prevent divergence of metadata during the initial data migration. The controller will need to process RPCs related to Raft as well as BrokerRegistration and BrokerHeartbeat. Other RPCs (such as CreateTopics) will be rejected with a NOT_CONTROLLER error.

Once the metadata migration is complete, the KRaft controller will begin operating normally.

ZK Broker Presence

When the KRaft controller comes up in migration mode, it will wait for all known ZK brokers to register themselves before starting the migration. The problem with this is we cannot know precisely what ZK brokers exist. The broker registrations in ZK are ephemeral and only show the brokers that are currently alive. If an operator had the brokers offline and started a migration, this would lead the controller to think no brokers exist. To improve on this, we can add a heuristic based on the cluster metadata to better capture the full set of ZK brokers. If we look at the topic assignments and configurations, we can calculate a set of brokers which have partitions assigned to them or have a dynamic config. This approach is still imperfect since brokers could be offline and have no assignments, but it will at least prevent any partition unavailability due to a broker running old software and not being able to participate in the migrationOnce the metadata migration is complete, the KRaft controller will begin operating normally.

AdminClient, MetadataRequest, and Forwarding

...

For inter-broker requests such as AlterPartitions and ControlledShutdown, we do not want to add the overhead of forwarding so we'll want to include the actual controller in the UpdateMetadataRequest. However, we cannot simply include the KRaft controller as the ControllerId. The ZK brokers connect to a ZK controller by using the "inter.broker.listener.name" config and the node information from LiveBrokers in the UpdateMetadataRequest. For connecting to a KRaft controller, the ZK brokers will need to use the "controller.listener.names" and "controller.quorum.voters" configs. To allow this, we will use the new KRaftControllerId field IsKRaftController field in UpdateMetadataRequest to indicate different controller types to the channel managers.

Topic Deletions

The ZK migration logic will need to deal with asynchronous topic deletions when migrating data. Normally, the ZK controller will complete these asynchronous deletions via TopicDeletionManager. If the KRaft controller takes over before a deletion has occurred, we will need to complete the deletion as part of the ZK to KRaft state migration. Once the migration is complete, we will need to finalize the deletion in ZK so that the state is consistent.

...

Since these two versions contain the same data, but with different field names, we can simply support v0 and v1 in KRaft brokers and avoid modifying the file on disk. By leaving this file unchanged, we better facilitate a downgrade to ZK during the migration. Once the controller has completed the migration and written the final ZkMigrationStateRecord, the brokers can rewrite their meta.properties files as v1 in their log directories.

Rollback to ZK

As mentioned above, it should be possible for the operator to rollback to ZooKeeper at any point in the migration process prior to taking the KRaft controllers out of migration mode. The procedure for rolling back is to reverse the steps of the migration that had been completed so far. 

...

If a migration has been started, but a KRaft controller is elected that is misconfigured (does not have zookeeper.metadata.migration.enable or ZK configs) this controller should resign. When replaying the metadata log during its initialization phase, this controller can see that a migration is in progress by seeing the initial MigrationRecordZkMigrationStateRecord. Since it does not have the required configs, it can resign leadership and throw an error.

If a migration has been finalized, but the KRaft quroum comes up with zookeeper.metadata.migration.enable, we must not re-enter the migration mode. In this case, while replaying the log, the controller can see the second MigrationRecord ZkMigrationStateRecord and know that the migration is finalized and should not be resumed. This should result in errors being thrown, but the quorum can continue operating as normal.

...

Since combined mode is primarily intended for developer environments, support for migrations under combined mode was not considered a priority for this design. By excluding it from this initial design, we can simply the implementation and reduce exclude an entire system configuration from the testing matrix by an entire dimension. The migration design is already very complex, so any reduction in scope is beneficial. In the future, it is possible that we could add support for combined mode migrations based on this design.

...