...
This command will parse the release-version to the matching metadata.version
and kraft.version
features. It will send a UpdateFeatures
request to a node with both features set to the matching version. KRaft will write the KRaftVersionRecord
control record, if all of the controllers and brokers support the new version. KRaft will use the information in the controller registration, broker registration and add voter records to determine if the new version is compatible.
Add
...
controller
To increase the number of controller the user needs to format a controller node, start the controller node and add that node to the KRaft cluster metadata partition. Formatting a controller node can be done with the following CLI command:
...
Code Block |
---|
kafka-metadata-quorum --bootstrap-server <endpoints> add-controller --config controller.properties |
Remove
...
controller
To decrease the number of controllers the user needs to execute the following CLI command:
Code Block |
---|
kafka-metadata-quorum --bootstrap-server <endpoints> remove-controller --controller-id <replica-id> --controller-uuid <replica-uuid> |
Common
...
scenarios
To better illustrate this feature this section describes two common scenarios that the Kafka administrator may perform.
Disk
...
failure recovery
If one of the replicas encounter a disk failure the operator can replace this disk with a new disk and start the replica.
...
When the RemoveVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), and (3, UUID3'). At this point the Kafka cluster has successfully recover from a disk failure in a controller node in a consistent way.
Node
...
failure recovery
Let's assume that the voter set is (1, UUID1), (2, UUID2) and (3, UUID3).
...
The operator can now decided to remove replica (3, UUID3) from the set of voters using the RemoveVoter RPC.
Reference
...
explanation
The general goal of this KIP is to allow the user to dynamically change the set of voters (also known as controllers) for the KRaft cluster metadata partition. This is achieved by storing the set of voters and their known endpoints in the log instead of the controller.quorum.voters
properties. Because the set of voters is stored in the log it allows the leader to replicate this information to all of the fetching replicas (voters and observers). Since old log segments (records) can be deleted once a snapshot has been created, the KRaft snapshots will also contain the set of voters up to the included offset.
...
When the clients sends a UpdateFeatures RPC to the active controller, if the FeatureUpdates.Feature
property is kraft.version
, the associated information will be passed to KRaft client. The KRaft client will implement two different algorithms to check if the upgrade is supported by voters and observers. For voters the KRaft client will comparing the upgraded version against all of the persisted voter set for the KRaft cluster metadata partition. The KRaft client cannot do this for observers (brokers) since their supported versions are not persisted in the log. The controller will instead push the broker registration information to the KRaft client.
Voter
...
changes
Adding
...
voters
Voters are added to the cluster metadata partition by sending an AddVoter
RPC to the leader. For safety Kafka will only allow one voter change operation at a time. If there are any pending voter change operations the leader will wait for them to finish.
...
Once the VotersRecord
operation has been committed by the majority of the new voter set, the leader can respond to the AddVoter
RPC and process new voter change operations.
Removing
...
voters
Voter are removed from the cluster metadata partition by sending a RemoveVoter
RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the VotersRecord
to the log, the the current voters set minus the voter getting removed, and immediately update its voter set to the new configuration.
...
When using this configuration for a new cluster, the quorum should be started with only one voter. This voter is bootstrapped by running the storage tool format command. This tool will create the cluster metadata partition and append the VotersRecord
to it. Additional voters can get added by using the AddVoter
RPC as described in this KIP.
Leader
...
election
It is possible for the leader to add a new voter to the voters set, write the VotersRecord
to the log and only replicate it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.
First
...
leader
When a KRaft voter becomes leader it will write a KRaftVersionRecord
and VotersRecord
to the log if the log or the latest snapshot doesn't contain any VotersRecord. This is done to make sure that the voter set in the bootstrap snapshot gets replicated to all of the voters and to not rely on all of the voters being configured with the same bootstrapped voter set.
...
The directory id, or replica uuid, will behave differently. The quorum shouldn't automatically update the directory id, since different values means that the disk was replaced. For directory id, the leader will only override it if it was not previously set. This behavior is useful for when a cluster gets upgraded to a kraft.version
greater than 0.
High
...
watermark
As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermark to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remote replicas.
...
To efficiently implement this feature the KRaft implement we keep track of all voters sets between the latest snapshot and the LEO. The replicas will update these order list of voters set whenever the latest snapshot id increases, a VotersRecord
control record is read from the log and the log is truncated.
Internal
...
listener
The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP -595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and revertedrequires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.
Controller auto joinning
To make it easier for users to operate a KRaft controller cluster, they have KRaft controller auto join the cluster voter set by setting the controller.quorum.auto.join.enabled
to true
. In this case, the controller will remove any voter in the voters set that matches its replica id but doesn't match its directory id (replica uuid) by sending the RemoveVoter
RPC. Once the controller has remove all duplicate replica uuid it will add itself to the voters set by sending a AddVoter
RPC to the leader, if its replica id and replica uuid is not in the voters set. The TimoutMs
for the AddVoter
RPC will be set to 10 minutes.
For an example, imagine that the user is trying to create KRaft controller quorum with 3 voters (1, 2, 3).
On controller 1 the user runs:
Code Block |
---|
kafka-storage format --cluster-id ABC --release-version "3.8" --standalone --config ... |
and starts the controller. When controller 1 starts it sees that is already in the voters set so it doesn't perform any AddVoter and RemoveVoter RPC. Controller 1 eventually becomes leader.
On controller 2 and 3 the user runs:
Code Block |
---|
kafka-storage format --cluster-id ABC --release-version "3.8" --config ... |
and starts the controllers. Noticed that neither --standalone
or --controller-quorum-voters
is used for controller 2 and 3 so the controllers start as observers. These controllers will discover the leader using controller.quorum.bootstrap.servers
and will use the RemoveVoter
and AddVoter
RPC as describe in the beginning of these section.
Public Interfaces
Configuration
...
If this configuration is specified, observers will not use the controller.quorum.voters
endpoints to discover the leader.
...
controller.quorum.auto.join.enabled
Controls whether a KRaft controller should automatically join the cluster metadata partition for its cluster id. If the configuration is set to true the controller must be stopped before removing the controller with kafka-metadata-quorum remove-controller
. The default value is false.
Log and snapshot control records
A few new control records will be added to the log and snapshot of a KRaft partition.
...
If the local replica is getting added to the voter set, it will allow the transition to prospective candidate when the fetch timer expires. The fetch timer is reset whenever it receives a successful Fetch
or FetchSnapshot
response.
Quorum
...
state
Each KRaft topic partition has a quorum state (QuorumStateData
) that gets persisted in the quorum-state
file in the directory for the topic partition. The following changes will be made to this state:
...
Code Block | ||
---|---|---|
| ||
git diff upstream/trunk clients/src/main/resources/common/message/DescribeQuorumResponse.json diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json index 0ea6271238..e2481dff04 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json +++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json @@ -18,7 +18,7 @@ "type": "response", "name": "DescribeQuorumResponse", // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836). - "validVersions": "0-1", + "validVersions": "0-2", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", @@ -40,10 +40,25 @@ { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" }, { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" } ]} - ]}], + ]}, + { "name": "Nodes", "type": "[]Node", "versions": "2+", "fields": [ + { "name": "NodeId", "type": "int32", "versions": "2+", + "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" }, + { "name": "Listeners", "type": "[]Listener", + "about": "The listeners of this controller", "versions": "0+", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the endpoint" }, + { "name": "Host", "type": "string", "versions": "0+", + "about": "The hostname" }, + { "name": "Port", "type": "uint16", "versions": "0+", + "about": "The port" } + ]} + ]} + ], "commonStructs": [ { "name": "ReplicaState", "versions": "0+", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" }, + { "name": "ReplicaUuid", "type": "uuid", "versions": "2+" }, { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The last known log end offset of the follower or -1 if it is unknown"}, { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1, |
Admin
...
client
The Java Admin client will be extended to support the new field in the DescribeQuorum response and the new AddVoter and RemoveVoter RPCs.
...
NAME | TAGS | TYPE | NOTE |
---|---|---|---|
number-of-voters | type=raft-metrics | gauge | number of voters for the cluster metadata topic partition. |
number-of-observers | type=raft-metrics | guage | number of observer that could be promoted to voters. |
pending-add-voter | type=raft-metrics | guage | 1 if there is a pending add voter operation, 0 otherwise. |
pending-remove-voter | type=raft-metrics | guage | 1 if there is a pending remove voter operation, 0 otherwise. |
TBD | TBD | guage | 1 if a controller node is not a voter for the KRaft cluster metadata partition, 0 otherwise. |
duplicate-voter-ids | type=raft-metrics | gauge | Counts the number of duplicate replica id in the set of voters. |
number-of-offline-voters | type=raft-metrics | gauge | Number of voters with a last Fetch timestamp greater than the Fetch timeout. |
ignored-static-voters | TBD | gauge | 1 if controller.quorum.voter is set and the kraft.version is greater than 0, 0 otherwise. |
Command
...
line interface
kafka-metadata-shell
A future KIP will describe how the kafka-metadata-shell tool will be extended to be able to read and display KRaft control records from the quorum, snapshot and log.
...
This tool as described in KIP-595 and KIP-836 will be improved to support these additional commands and options:
describe --status
This command be extended to print the new information added to the DescribeQuorum RPC. The includes the directory id for all of the replicas (voters and observers). The known endpoints for all of the voters. Any uncommitted voter changes.
...
This command is used to remove voters from the KRaft cluster metadata partition. The flags --controller-id and --controller-uuid must be specified.
Compatibility,
...
deprecation, and
...
migration plan
The features in this KIP will be supported if the ApiVersions of all of the voters and observers is greater than the versions described here. If the leader has a replica UUID for all of the voters then this KIP is supported by all of the voters.
...
It is safe for the operator to remove the configuration for controller.qourum.voters
when the kraft.version
has been upgrade to version 1. All of the Kafka nodes will expose the ignored-static-voter
metrics. If all of the Kafka nodes expose a 1 for this metrics, it is safe to remove controller.quorum.voters
from the node configuration and specify the controller.quorum.bootstrap.servers
instead.
Test
...
plan
This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ specification.
Rejected
...
alternatives
KIP-642: Dynamic quorum reassignment. KIP-642 describe how to perform dynamic reassignment will multiple voters added and removed. KIP-642 doesn't support disk failures and it would be more difficult to implement compared to this KIP-853. In a future KIP we can describe how we can add administrative operations that support the addition and removal of multiple voters.
...