Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP describes a protocol for extending KIP-595 so that the operators can programmatically update the voter voters set in a way that is safe consistent and available. There are two important use cases that this KIP supports. One use case is that the operator wants to change the number of controllers by adding or removing a controller.  The other use case is that the operation wants to replace a controller because of a disk or hardware failure.

...

Voters: A voter is any replica that can transition to the candidate state and to the leader state. Voters for the KRaft cluster metadata partition are also called controllers. Voters are required to have an ID and UUID.

Voter Voters set: A voter voters set is the group of voters for a partition. Each replica keeps their own set of voters that are part of the topic partition. For a replica, the voters are the replica ID and UUID is in its own voter voters set. A candidate needs to get votes from the majority of its own voter voters set before it can become the leader of an epoch. When a voter becomes a leader it will use its voter voters set to determine when an offset has been committed.

Observers: An observer is any replica that is not in the voter voters set. This is because they have an ID and UUID which is not in the voter voters set or they don't have an ID or UUID.

...

Common scenarios

To better illustrate this feature this section describes two common scenarios that the Kafka administrator may performillustrate this feature this section describes few common scenarios that the Kafka administrator may perform.

Automatic joining controllers

Imagine that the user is trying to create KRaft controller quorum with 3 voters (1, 2, 3).

For all of the controller properties files assume that the controller.quorum.auto.join.enable is set to true and the controller.quorum.bootstrap.servers contains the endpoint of controller 1.

On controller 1 the user runs:

Code Block
kafka-storage format --cluster-id <cluster-id> --release-version 3.8 --standalone --config controller.properties

and starts the controller. When controller 1 starts it sees that is already in the voters set so it eventually becomes leader.

On controller 2 and 3 the user runs:

Code Block
kafka-storage format --cluster-id <cluster-id> --release-version 3.8 --config controller.properties

and starts the controllers. Notice that neither --standalone or --controller-quorum-voters is used for controller 2 and 3 so the controllers will start as observers. These controllers will discover the leader using controller.quorum.bootstrap.servers and will use the AddVoter RPC to join the voters set.

Disk failure recovery

If one of the replicas encounter a disk failure the operator can replace this disk with a new disk and start the replica.

...

At this point the disk for replica 3 is replaced and formatted. This means that when replica 3 starts it will have a new replica uuid (UUID3') and an empty set of voters. Replica 3 will discover the partition leader either using controller.quorum.bootstrap.servers or by the leader sending a BeginQuorumEpoch request to replica 3. Once the new replica, (3, UUID3'), discovers the leader it send the Fetch and FetchSnapshot requests to the leader. At this point the leader's set of voters will be (1, UUID1), (2, UUID2) and (3, UUID3) and the set of observers will include (3, UUID3').

This state can be discovered by the operator by using the DescribeQuorum RPC, the Admin client or the kafka-metadata-quorum CLI. The operator can now decide to add replica (3, UUID3') to the set of voters using the AddVoter RPC, the Admin client or the kafka-metadata-quorum CLI.

When the AddVoter RPC succeeds the voters set will be (1, UUID1), (2, UUID2), (3, UUID3) and (3, UUID3'). The operator can remove the failed disk from the voter voters set by using the RemoveVoter RPC, the Admin client or the kafka-metadata-quorum CLI.

When the RemoveVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), and (3, UUID3'). At this point the Kafka cluster has successfully recover from a disk failure in a controller node in a consistent way.

Node failure recovery

Let's assume that the voter voters set is (1, UUID1), (2, UUID2) and (3, UUID3).

At this point replica 3 has failed and the Kafka operator would like to replace it. The operator would start a new controller with replica id 4. The node 4's metadata log dir would get formatted, generate and persist the directory id UUID4. Replica 4 will discover the leader by sending Fetch and FetchSnapshot request to the servers enumerated in controller.quorum.bootstrap.servers. After a successful Fetch RPC, the leader's set of voters will be (1, UUID1), (2, UUID2), (3, UUID3) and set of observers will include (4, UUID4).

The operator can now decide to add replica (4, UUID4) to the set of voters using the AddVoter RPC. When this operation succeeds the set of voters will be  (1, UUID1), (2, UUID2), (3, UUID3) and (4, UUID4).

The operator can now decided to remove replica (3, UUID3) from the set of voters using the RemoveVoter RPC.

Reference explanation

...

The fetching (following) voters will discover the set of voters and their endpoints by fetching the latest log from the leader. How do new voters discovery the leader's endpoint? The leader will push this information to new voters (or voters that were offline for a long time) using the BeginQuorumEpoch request. The active leader sends BeginQuorumEpoch to all of the voters in the voter voters set when it becomes leader for an epoch.

The leader doesn't send BeginQuorumEpoch to observers since this are dynamic and are not included in the KRaft partition log. Observer will instead discover the leader using the controller.quorum.bootstrap.servers. It is important that this property includes at least one of the available voters, else brokers (observers) will not be able to discover the leader of the KRaft cluster metadata partition.

To make changes to the voter voters set safe consistent it is required that the majority of the competing voter voters sets commit the voter changes. In this design the competing voter voters sets are the current voter voters set and new voter voters set. Since this design only allows one voter change at a time the majority of the new configuration always overlaps (intercepts) the majority of the old configuration. This is done by the leader committing the current epoch when it becomes leader and committing single voter changes with the new voter voters set before accepting another voter change.

...

There will be a new SupportedFeature added to the ApiVersions response of the Kafka nodes. The name of this new supported feature will be kraft.version. The default value be 0 and represents that only KIP-595: A Raft Protocol for the Metadata QuorumKIP-630: Kafka Raft Snapshot and KIP-996: Pre-Vote are supported. Version 1 means that this KIP is supported.

When the clients sends a UpdateFeatures RPC to the active controller, if the FeatureUpdates.Feature property is kraft.version, the associated information will be passed to KRaft client. The KRaft client will implement two different algorithms to check if the upgrade is supported by voters and observers. For voters, the KRaft client leader will comparing compare the upgraded version against all of the persisted voter set for the KRaft cluster metadata partitionvoters' supported kraft.version. The KRaft client leader cannot do this for observers (brokers) since their supported versions are not persisted in the logdirectly known by the KRaft leader. The controller state machine will instead push the broker registration brokers' kraft.version information to the KRaft client.

...

If there are no pending voter change operations the leader send an ApiVersions request to the new voter's endpoint to discover it's kraft.version support features. If the new leader supports the current kraft.version, it will write a VotersRecord to the log, with the current voters set plus the voter getting added, and immediately update its in-memory quorum state to include this voter as part of the quorum. Any replica that replicates and reads this VotersRecord will update their in-memory voter voters set to include this new voter. Voters will not wait for these records to get committed before updating their voter voters set.

Once the VotersRecord operation has been committed by the majority of the new voter voters set, the leader can respond to the AddVoter RPC and process new voter change operations.

...

Voter are removed from the cluster metadata partition by sending a RemoveVoter RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the VotersRecord to the log, the the current voters set minus the voter getting removed, and immediately update its voter voters set to the new configuration.

Once the VotersRecord operation has been committed by the majority of the new voter voters set, the leader can respond to the RPC. If the removed voters is the leader, the leader will resign from the quorum when the VotersRecord has been committed. To allow this operation to be committed and for the leader to resign the followers will continue to fetch from the leader even if the leader is not part of the new voter voters set. In KRaft, leader election is triggered when the voter hasn't received a successful response in the fetch timeout.

...

It is possible for the leader to add a new voter to the voters set, write the VotersRecord to the log and only replicate it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter voters set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter voters set or the voting replica is not in the voter voters set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.

...

When a KRaft voter becomes leader it will write a KRaftVersionRecord and VotersRecord to the log if the log or the latest snapshot doesn't contain any VotersRecord. This is done to make sure that the voter voters set in the bootstrap snapshot gets replicated to all of the voters and to not rely on all of the voters being configured with the same bootstrapped voter voters set.

Automatic endpoint and directory id discovery

...

With this KIP, it is possible for the leader to not be part of the voter voters set when the replica removed is the leader. In this case the leader will continue to handle Fetch and FetchSnapshot request as normal but it will not count itself when computing the high watermark.

...

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter voters set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

...

To make it easier for users to operate a KRaft controller cluster, they can have KRaft controller auto controllers automatically join the cluster voter 's voters set by setting the controller.quorum.auto.join.enabled to enable to true. In this case, the controller will remove any voter in the voters set that matches its replica id but doesn't match its directory id (replica uuid) by sending the RemoveVoter RPC. Once the controller has remove all duplicate replica uuid it will add itself to the voters set by sending a AddVoter RPC to the leader, if its replica id and replica uuid is not in the voters set. The TimoutMs for the AddVoter RPC will be set to 10 minutes.

...

and starts the controllers. Noticed that Notice that neither --standalone or --controller-quorum-voters is used for controller 2 and 3 so the controllers start as observers. These controllers will discover the leader using controller.quorum.bootstrap.servers and will use the RemoveVoter and AddVoter RPC as describe in the beginning of these section.

...

If this configuration is specified, observers will not use the controller.quorum.voters endpoints to discover the leader. 

controller.quorum.auto.join.

...

enable

Controls whether a KRaft controller should automatically join the cluster metadata partition for its cluster id. If the configuration is set to true the controller must be stopped before removing the controller with kafka-metadata-quorum remove-controller. The default value is false.

...

This record will also be written to the log by the leader when handling the AddVoter, RemoveVoter and UpdateVoter RPCs. When handling these RPCs the leader will make sure that the difference in voters is at most one addition or one removal. This is needed to satisfy the invariant that voter changes are committed by the old voter voters set and new voter voters set.

This record will also be written by the leader to the log if it has never been written to the log in the past. This semantic is nice to have to consistently replicate the bootstrapping snapshot, at 00000000000000000000-0000000000.checkpoint, at the leader to all of the voters.

...

KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters a VotersRecord it will replace the current voter voters set.

If the local replica is the leader and it is getting removed, the replica will stay leader until the VotersRecord gets committed or the epoch advances (which forces it to lose leadership).

If the local replica is getting added to the voter voters set, it will allow the transition to prospective candidate when the fetch timer expires. The fetch timer is reset whenever it receives a successful Fetch or FetchSnapshot response.

...

At this point if the user decides to add a new voter (4) to the voters set and the leader doesn't wait for voter (4) to catch up to the LEO, the voter voters set may become:

Voter 1: LEO = 200
Voter 2: LEO = 200
Voter 3: LEO = 100
Voter 4: LEO = 0

...

In 3., the leader will wait for its current epoch to commit by waiting for the LeaderChangeMessage to commit. This is required to guarantee that two competing voters set, the one from a previous leader and the one from the current leader, only differ by at most one voter. Waiting for the current epoch to commit means that there cannot be some other competing voter voters set from another leader that can later override this leader's new voter voters set. See bug in single-server membership changes for more details on this.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this VotersRecord so it doesn't know that it is a voter for this topic partition. The new replica will accept the BeginQuorumEpoch RPC even if it is doesn't believe it is a member of the voter voters set. 

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replica ID and UUID that is already in the committed voter voters set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.
  4. INVALID_REQUEST - when the new voter doesn't support the current kraft.version.
  5. REQUEST_TIMED_OUT - when the new voter didn't catch-up to the LEO in the time specified in the request.

...

  1. Wait until there are no uncommitted VotersRecord. Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed. Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations.
  3. Append the VotersRecord to the log with the updated voters set.
  4. The KRaft internal listener will read this record from the log and remove the voter from the voter voters set.
  5. Wait for the VotersRecord to commit using the majority of new configuration.
  6. Send the RemoveVoter response to the client.
  7. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

...

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replica ID and UUID that is already not in the committed voter voters set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.
  4. REQUEST_TIMED_OUT

...

Since the set of voters can change and not all replicas know the latest voter voters set, handling of Vote request needs to be relaxed from what was defined and implemented for KIP-595.

KRaft replicas will accept Vote requests from all replicas. Candidate replicas don't need to be in the voters' voter voters set to receive a vote. This is needed to be able to elect a leader from the new voter voters set even though the new voter voters set hasn't been replicated to all of its voters. 

...

When handling the BeginQuorumEpoch request the replica will accept the request if the LeaderEpoch is equal or greater than their epic. The receiving replica will not check if the new leader or itself is in the voter voters set. This change is required because the receiving may not have fetched the latest voter voters set.

The BeginQuorumEpoch RPC will be used by the leader to propagate the leader endpoint to all of the voters. This is needed so that all voters have the necessary information to send Fetch and FetchSnaphsot requests.

...

When removing the leader from the voters set, it will remain the leader for that epoch until the VotersRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter voters set. This also means that if the leader is not part of the voter voters set it should not include itself when computing the committed offset (also known as the high-watermark) and when checking that the quorum is alive.

...

No changes are needed to the handling of this RPC. The only thing to keep in mind is that replicas will accept FetchSnapshot request as long as they are the latest leader even if they are not in the current voter voters set.

Request

Add the replica UUID (directory id) to the Fetch request. This is not needed for correctness because leaders only track fetched offsets using the Fetch RPC but it is good add for consistency and debugability.

...