Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP describes a protocol for extending KIP-595 and KIP-630 so that the operator can programmatically update the voter set in a way that is safe and is available. There are two important use cases that this KIP supports. One use case is that the operator wants to change the number of controllers by adding or removing a controller.  The other use case is that the operation wants to replace a controller because of a disk or hardware failure.

Key

...

terms

These are the definition of some important terms that are used through the document.

...

Observers: An observer is any replica that is not in the voter set. This is because they have an ID and UUID which is not in the voter set or they don't have an ID or UUID.

Proposed

...

changes

This KIP is inspired by Chapter 4 of Consensus: Bridging Theory and Practice [2]. The description of this KIP makes the assumption that the reader is familiar with the references enumerated at the bottom of this page.the assumption that the reader is familiar with the references enumerated at the bottom of this page. The user-level explanation section explains the feature as the user would interact with it. The reference-level explanation section goes into the implementation details of this feature. 

User explanation

There will be two mechanisms for bootstrapping the KRaft cluster metadata partition. For context, the set of voters in the KRaft cluster metadata partition is currently bootstrapped and configured using the controller.quorum.voters server property. This property is also used to configured the brokers with the set of endpoint that know the location of the leader if it exists.

This feature will instead persist the set of voters in the cluster metadata partition log, snapshots and quorum state. The new controller.quorum.bootstrap.servers property will be added to configure replicas with a set of endpoints that could be used to discover the set of voters and leader.

Bootstrapping with one voter

The recommended method for creating a new KRaft cluster metadata partition is to bootstrap it with one voter. This can be done with the following CLI command:

Code Block
kafka-storage format --cluster-id <cluster-id> --release-version 3.8 --standalone --config controller.properties

This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated directory.id, 2) create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control records (KRaftVersionRecord and AddVoterRecord) to make this Kafka node the only voter for the quorum.

Bootstrapping with multiple voters 

In some deployment scenarios and to support a similar configuration to Apache ZooKeeper, the KRaft cluster metadata partition can also be bootstrap with more than one voter. This can be done with the following CLI command:

Code Block
kafka-storage format --cluster-id <cluster-id> --release-version 3.8 --controller-quorum-voters <replica-id>-<replica-uuid>@<host>:<port>,... --config controller.properties

This command is similar to the standalone version but the snapshot at 00000000000000000000-0000000000.checkpoint will instead contain an AddVoterRecord for all of the voters specified in --controller-quorum-voters. Just like the controller.quorum.voters properties, it is important that the set of voters is equal in all of the controllers with the same cluster id.

Upgrade KRaft protocol

The changes to the protocol and replicated state to implement this feature are not compatible with the existing KRaft implementation. This feature cannot be enabled unless all of the replicas for the KRaft cluster metadata partition support this feature. The user can upgrade the KRaft version with the following command:

Code Block
kafka-feature upgrade --release-version 3.8 --bootstrap-server <endpoints>

This command will parse the release-version to the matching MetadataVersion and KRaftVersion. It will send a UpdateFeatures request to a node with both features set to the matching version. KRaft will write the KRaftVersionRecord control record, if all of the controllers and brokers support the new version. KRaft will use the information in the controller registration, broker registration and add voter records to determine if the new version is compatible.

Add Controller

To increase the number of controller the user needs to format a controller node, start the controller node and add that node to the KRaft cluster metadata partition. Formatting a controller node can be done with the following CLI command:

Code Block
kafka-storage format --cluster-id <cluster-id> --release-version 3.8 --config controller.properties

Notice that neither the --standalone or --controller-quorum-voters flags is used. After the new controller node has been started, the node can be added to the KRaft cluster metadata partition with the following CLI command:

Code Block
kafka-metadata-quorum --bootstrap-server <endpoints> add --config controller.properties

Remove Controller

To decrease the number of controller the use needs to execute the following CLI command:

Code Block
kafka-metadata-quorum --bootstrap-server <endpoints> remove --voter-id <voter-id> --voter-uuid <voter-uuid>

Reference Explanation

Replica UUID

In addition to a replica ID each replica assigned to a KRaft topic partition will have a replica UUID. This UUID will be generated once and persisted as part of the quorum state for the KRaft topic partition. A replica (controller and broker) may host multiple topic partitions in multiple log directories. The replica UUID will be generated per topic partition hosted by the replica.

...

Replicas in a KRaft topic partition that are voter or can become voters will be identified by their ID and UUID.

Voter Changes

Adding Voters

Voters are added to the cluster metadata partition by sending an AddVoter RPC to the leader. For safety Kafka will only allow one voter change operation at a time. If there are any pending voter change operations the leader will wait for them to finish. If there are no pending voter change operations the leader will write a AddVoterRecord to the log and immediately update its in-memory quorum state to include this voter as part of the quorum. Any replica that replicates and reads this AddVoterRecord will update their in-memory voter set to include this new voter. Voters will not wait for these records to get committed before updating their voter set.

Once the AddVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the AddVoter RPC and process new voter change operations.

Removing Voters

Voter are removed from the cluster metadata partition by sending a RemoveVoter RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the RemoveVoterRecord to the log and immediately update its voter set to the new configuration.

Once the RemoveVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC. If the removed voters is the leader, the leader will resign from the quorum when the RemoveVoterRecord has been committed. To allow this operation to be committed and for the leader to resign the followers will continue to fetch from the leader even if the leader is not part of the new voter set. In KRaft leader election is triggered when the voter hasn't received a successful response in the fetch timeout.

Bootstrapping

The section describe how the quorum will get bootstrap to support this KIP. There are two configurations that Kafka will support and each will be explain separately. The Kafka cluster can be configured to use the existing controller.quorum.voters  or the new property called controller.quorum.bootstrap.servers. These two configurations are mutually exclusive the KRaft cluster is expected to use one or the other but not both.

controller.quorum.voters

This is a static quorum configuration where all of the voters' ID, host and port are specified. An example value for this configuration is 1@localhost:9092,2@localhost:9093,3@localhost:9094 .

...

If the cluster metadata topic partition contains AddVoterRecords for replica ids that do not enumerated in controller.quorum.voters then the replica will faile to start and shutdown.

controller.quorum.bootstrap.servers

This configuration describe the set of hosts and ports that can be queried to discover the cluster metadata partition leader. Observers and to-be-added voters will send Fetch requests to this list of servers until the leader is discovered.

When using this configuration the quorum will be started with only one voter. This voter is bootstrapped by running the kafka-storage.sh tool. This tool will create the cluster metadata partition and append one AddVoterRecord to it. Additional voters can get added by using the AddVoter RPC as described in this KIP.

Leader Election

It is possible for the leader to write an AddVoterRecord to the log and replica it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.

High Watermark

As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermakr to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remove replicas.

With this KIP it is possible for the leader to not be part of the voter set when the replica removed is the leader. In this case the leader will continue to handle Fetch and FetchSnapshot request as normal but it will not count itself when computing the high watermark.

Snapshots

The snapshot generation code needs to be extended to include these new KRaft specific control records for AddVoterRecord and RemoveVoterRecord. Before this KIP the snapshot didn't include any KRaft generated control records.

Internal Listener

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

Common Scenarios

To better illustrate this feature this section describes two common scenarios that the Kafka administrator may perform.

Disk Failure Recovery

If one of the replicas encounter a disk failure the operation can replace this disk with a new disk and start the replica.

...

This state can be discovered by a client by using the DescribeQuorum RPC, the Admin client or the kafka-metadata-quorum CLI. The client can now decide to add replica (3, UUID3') to the set of voters using the AddVoter RPC, the Admin client or the kafka-metadataametadata-quorum CLI.

When the AddVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), (3, UUID3) and (3, UUID3'). The client can remove the failed disk from the voter set by using the RemoveVoter RPC, the Admin client or the kafka-metadata-quorum CLI.

When the RemoveVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), and (3, UUID3'). At this point the Kafka cluster has successfully recover from a disk failure in a controller node.

Node Failure Recovery

To support this scenario the Kafka cluster must be configured to use controller.quorum.bootstrap.servers. Let's assume that the voter set is (1, UUID1), (2, UUID2) and (3, UUID3).

...

Code Block
languagejs
{
  "type": "data",
  "name": "AddVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the voter getting added to the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The replica generated UUID of the replica getting added as a voter to the topic partition"},
    { "name": "EndPoints", "type": "[]Endpoint", "versions": "0+",
      "about": "The endpoint that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]},
    { "name": "KRaftFeature", "type": KRaftFeature",
      "about": "The range of versions of the protocol that the replica supports", "versions": "0+", "fields": [
  ]
}

Handling

KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters an AddVoterRecord it will add the replica ID and UUID to its voter set. If the replica getting added is itself then it will allow the transition to candidate when the fetch timer expires. The fetch timer is reset whenever it receives a successful Fetch or FetchSnapshot response.

...

Response

Version 1 renames LeaderIdAndEpoc LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

...