...
Version 0 of this data will get written if the kraft.version
is 0. Version 1 of this data will get written if the kraft.version
is 1. (TODO: there is one small issue. A candidate at kraft.version 1 may ask for a vote from a voter at kraft.version 0. In this case how will the voter persist the uuid?)
Code Block | ||
---|---|---|
| ||
git diff upstream/trunk raft/src/main/resources/common/message/QuorumStateData.json diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json index d71a32c75d..34881f05ff 100644 --- a/raft/src/main/resources/common/message/QuorumStateData.json +++ b/raft/src/main/resources/common/message/QuorumStateData.json @@ -16,19 +16,25 @@ { "type": "data", "name": "QuorumStateData", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ - {"name": "ClusterId", "type": "string", "versions": "0+"}, - {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "AppliedOffset", "type": "int64", "versions": "0+"}, - {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"} + { "name": "ClusterId", "type": "string", "versions": "0" }, + { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "VotedUuid", "type": "uuid", "versions": "1+" }, + { "name": "AppliedOffset", "type": "int64", "versions": "0" }, + { "name": "AppliedRecord", "type": "AppliedRecord", "versions": "1+", "fields": [ + { "name": "EndOffset", "type": "int64", "versions": "1+" }, + { "name": "Epoch", "type": "int32", "versions": "1+" } + ]}, + { "name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+" } ], "commonStructs": [ { "name": "Voter", "versions": "0+", "fields": [ - {"name": "VoterId", "type": "int32", "versions": "0+"} + { "name": "VoterId", "type": "int32", "versions": "0+" }, + { "name": "VoterUuid", "type": "uuid", "versions": "1+" } ]} ] } |
...
- Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
- Wait until there are no uncommitted add or remove voter recordsrecord.
- Wait for the LeaderChangeMessage control record from the current epoch to get committed.
- Send an ApiVersions RPC to the first listener to discover the supported
kraft.version
of the new voter. - Check that the new voter supports the current
kraft.version
. - Append the AddVoterRecord to the log.
- The KRaft internal listener will read this record from the log and add the voter to the voter set.
- Wait for the AddVoterRecord to commit using the majority of new voter set.
- Send the AddVoter response to the client.
In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log, the set of voter changes. If the new voter is too far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.
In 3., the leader will wait for its current epoch to commit by waiting for the LeaderChangeMessage to commit. This is required to guarantee that two competing voter set, the one from a previous leader and the one from the current leader, only differ by at most one voter. Waiting for the current epoch to commit means that there cannot be some other competing voter set from another leader that can later override this leader's new voter set. See bug in single-server membership changes for more details on this.
In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will accept the BeginQuorumEpoch RPC even if it is doesn't believe it is a member of the voter set.
...
DescribeQuorum
TODO: Fix this RPC
TODO: Include the listeners/endpoints for all of the voters known by the leader.
The version of the The version of the request is increase and the fields remain unchanged.
...
NAME | TAGS | TYPE | NOTE |
---|---|---|---|
number-of-voters | type=raft-metrics | gauge | number of voters for the cluster metadata topic partition. |
number-of-observers | type=raft-metrics | guage | number of observer that could be promoted to voters. |
pending-add-voter | type=raft-metrics | guage | 1 if there is a pending add voter operation, 0 otherwise. |
pending-remove-voter | type=raft-metrics | guage | 1 if there is a pending remove voter operation, 0 otherwise. |
TBD | TBD | guage | 1 if a controller node is not a voter for the KRaft cluster metadata partition, 0 otherwise. |
duplicate-voter-ids | type=raft-metrics | gauge | Counts the number of duplicate replica id in the set of voters. |
number-of-offline-voters | type=raft-metrics | gauge | Number of voters with a last Fetch timestamp greater than the Fetch timeout. |
ignored-static-voters | TBD | gauge | 1 if controller.quorum.voter is set and the kraft.version is greater than 0, 0 otherwise. |
Command Line Interface
kafka-metadata-shell
...
A future KIP will describe how the kafka-metadata-shell tool will be extended to be able to read and display KRaft control records from the quorum, snapshot and log.
kafka-storage
...
The format command will get extended as follow.
--standalone
This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated directory.id, 2) create a snapshot at 00000000000000000000-0000000000.checkpoint
with the necessary control records (KRaftVersionRecord
and AddVoterRecord
) to make this Kafka node the only voter for the quorum.
This option is unsafe because it doesn’t use the quorum to establish the new quorum. This will be documented in the –standalone option help description.
--metadata-quorum
The value to this option will have the follow schema <replica-id>[-<replica-uuid>]@<host>:<port>. Logically, this function very similar to the controller.quorum.voters. The important difference is that it will optionally support the user directly specifying the replica's directory id.
Similar to the --standalone option this command will 1) create a meta.properties file in metadata.log.dir with the specified directory id (replica-uuid). If the optional directory id is not specified then the command will generate a random directory id. 2) Create a snapshot at 00000000000000000000-0000000000.checkpoint
with the necessary control records (KRaftVersionRecord
and AddVoterRecord
).
This is option is very unsafe and it is important that the operator uses the same value across all of the voters specified.
kafka-features
The upgrade and downgrade command will support a new configuration flag. A downgrade that results in the decrease of the kraft.version
will be rejected by the KRaft leader.
--release-software
The value specified in this flag will be used to find the corresponding metadata.version
and kraft.version
. The --metadata
version flag will get deprecated and will be a synonym for --release-software
.
kafka-metadata-quorum
The kafka-metadata-quorum tool described in KIP-595 and KIP-836 will be improved to support these additional commands:
describe
This command be extended to print the new information added to the DescribeQuorum RPC. The includes the directory id for all of the replicas (voters and observers). The known endpoints for all of the voters. Any uncommitted voter changes.
Code Block |
---|
kafka-metadata-quorum describe --status |
...
TODO: Describe these changes
kafka-metadata-quorum.sh
The kafka-metadata-quorum tool described in KIP-595 and KIP-836 will be improved to support these additional commands:
describe
TODO: This command should print all of the voter endpoints that the leader knows about. It should also display if there are any uncommitted add or remove voter changes.
This command will print both the ReplicaId and ReplicaUuid for CurrentVoters. A new row called CouldBeVoters will be added which print the Replica ID and UUID of any replica that could be added to the voter set. E.g.
Code Block |
---|
> bin/kafka-metadata-quorum.sh describe ClusterId: SomeClusterId LeaderId: 0 LeaderEpoch: 15 HighWatermark: 234130 MaxFollowerLag: 34 MaxFollowerLagTimeMs: 15 CurrentVoters: [{"id": 0, "uuid": "UUID1"},", "endpoints": ["host:port"]}, {"id": 1, "uuid": "UUID2", "endpoints": ["host:port"]}, {"id": 2, "uuid": "UUID2", "endpoints": ["host:port"]}] Observers: [{"id": 3, "uuid": "UUID3"}] UncommittedAddedVoter: {"id": 12, "uuid": "UUID2"}, {"idendpoints": 2, "uuid": "UUID2"}] Observers: [["host:port"]} UncommittedRemovedVoter: {"id": 32, "uuid": "UUID2", "endpoints": ["UUID3host:port"]}] |
describe --replication
This command will print on additional column for the replica uuid after the replica id. E.g.
Code Block |
---|
> bin/kafka-metadata-quorum.sh --describe replication ReplicaId ReplicaUuid LogEndOffset ... 0 uuid1 234134 ... ... |
add-controller --config <server.properties>
This command is used use to add new voters controllers to the topic KRaft cluster metadata partition. The flags --replica-id and --replica-uuid must be specified. A future KIP will describe how the user can specify endpoint information for the replicaThis command must be executed using the server configuration of the new controller. The command will read the server properties file to read the replica id, the endpoints, and the meta.properties for the directory id.
remove-controller --controller-id <controller-id> --controller-uuid <controller-uuid>
This command is used to remove voters from the topic KRaft cluster metadata partition. The flags --replicacontroller-id and --replicacontroller-uuid must be specified.
Compatibility, Deprecation, and Migration Plan
...