Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/zb5l1fsqw9vj25zkmtnrk6xm7q3dkm1v
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-595 introduced KRaft topic partitions. These are partitions with replicas that can achieve consensus on the Kafka log without relying on the Controller or ZK. The KRaft Controllers in KIP-631 use one of these topic partitions (called cluster metadata topic partition) to order operations on the cluster, commit them to disk and replicate them to other controllers and brokers.
Consensus on the cluster metadata partition was achieved by the voters (Controllers). If the operator of a KRaft cluster wanted to make changes to the set of voters, they would have to shutdown all of the controllers nodes and manually make changes to the on-disk state of the old controllers and new controllers. If the operator wanted to replace an existing voter because of a disk failure or general hardware failure, they would have to make sure that the new voter node has a superset of the previous voter's on-disk state. Both of these solutions are manual and error prone.
This KIP describes a protocol for extending KIP-595 and KIP-630 so that the operator can programmatically update the voter set in a way that is safe and is available. There are two important use cases that this KIP supports. One use case is that the operator wants to change the number of controllers by adding or removing a controller. The other use case is that the operation wants to replace a controller because of a disk or hardware failure.
Key terms
These are the definition of some important terms that are used through the document.
Voters: A voter is any replica that can transition to the candidate state and to the leader state. Voters are required to have an ID and UUID. Each replica keeps their own set of voters that are part of the topic partition. For a replica, the voters are the replica ID and UUID is in its own voter set. A candidate needs to get votes from the majority of its own voter set before is can become the leader of an epoch. When a voter becomes a leader it will use its voter set to determine when an offset has been committed.
Observers: An observer is any replica that is not in the voter set. This is because they have an ID and UUID which is not in the voter set or they don't have an ID or UUID.
Proposed changes
This KIP is inspired by Chapter 4 of Consensus: Bridging Theory and Practice [2]. The description of this KIP makes the assumption that the reader is familiar with the references enumerated at the bottom of this page. The user-level explanation section explains the feature as the user would interact with it. The reference-level explanation section goes into the implementation details of this feature.
User explanation
There will be two mechanisms for bootstrapping the KRaft cluster metadata partition. For context, the set of voters in the KRaft cluster metadata partition is currently bootstrapped and configured using the controller.quorum.voters
server property. This property is also used to configured the brokers with the set of endpoint that know the location of the leader if it exists.
This feature will instead persist the set of voters in the cluster metadata partition log, snapshots and quorum state. The new controller.quorum.bootstrap.servers
property will be added to configure replicas with a set of endpoints that could be used to discover the set of voters and leader.
Bootstrapping with one voter
The recommended method for creating a new KRaft cluster metadata partition is to bootstrap it with one voter. This can be done with the following CLI command:
kafka-storage format --cluster-id <cluster-id> --release-version 3.8 --standalone --config controller.properties
This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated directory.id, 2) create a snapshot at 00000000000000000000-0000000000.checkpoint
with the necessary control records (kraft.versionRecord
and AddVoterRecord
) to make this Kafka node the only voter for the quorum.
Bootstrapping with multiple voters
In some deployment scenarios and to support a similar configuration to Apache ZooKeeper, the KRaft cluster metadata partition can also be bootstrap with more than one voter. This can be done with the following CLI command:
kafka-storage format --cluster-id <cluster-id> --release-version 3.8 --controller-quorum-voters <replica-id>-<replica-uuid>@<host>:<port>,... --config controller.properties
This command is similar to the standalone version but the snapshot at 00000000000000000000-0000000000.checkpoint
will instead contain an AddVoterRecord for all of the voters specified in --controller-quorum-voters. Just like the controller.quorum.voters properties, it is important that the set of voters is equal in all of the controllers with the same cluster id.
Upgrade KRaft protocol
The changes to the protocol and replicated state to implement this feature are not compatible with the existing KRaft implementation. This feature cannot be enabled unless all of the replicas for the KRaft cluster metadata partition support this feature. The user can upgrade the KRaft version with the following command:
kafka-feature upgrade --release-version 3.8 --bootstrap-server <endpoints>
This command will parse the release-version to the matching MetadataVersion and kraft.version. It will send a UpdateFeatures request to a node with both features set to the matching version. KRaft will write the kraft.versionRecord control record, if all of the controllers and brokers support the new version. KRaft will use the information in the controller registration, broker registration and add voter records to determine if the new version is compatible.
Add Controller
To increase the number of controller the user needs to format a controller node, start the controller node and add that node to the KRaft cluster metadata partition. Formatting a controller node can be done with the following CLI command:
kafka-storage format --cluster-id <cluster-id> --release-version 3.8 --config controller.properties
Notice that neither the --standalone
or --controller-quorum-voters
flags is used. After the new controller node has been started, the node can be added to the KRaft cluster metadata partition with the following CLI command:
kafka-metadata-quorum --bootstrap-server <endpoints> add-controller --config controller.properties
Remove Controller
To decrease the number of controller the use needs to execute the following CLI command:
kafka-metadata-quorum --bootstrap-server <endpoints> remove-controller --replica-id <replica-id> --replica-uuid <replica-uuid>
Common Scenarios
To better illustrate this feature this section describes two common scenarios that the Kafka administrator may perform.
Disk Failure Recovery
If one of the replicas encounter a disk failure the operator can replace this disk with a new disk and start the replica.
Let's assume that the cluster has the following voters for the KRaft cluster metadata partition (1, UUID1), (2, UUID2) and (3, UUID3). The first element of the tuples is the replica id. The second element of the tuples is the replica uuid. The replica uuid, or directory id, is generated using the storage tool when the node is formatted.
At this point the disk for replica 3 is replaced and formatted. This means that when replica 3 starts it will have new replica uuid (UUID3') and an empty set of voters. Replica 3 will discover the partition leader either using controller.quorum.bootstrap.servers or by the leader sending a BeginQuorumEpoch request to replica 3. Once the new replica, (3, UUID3'), discovers the leader it send the Fetch and FetchSnapshot requests to the leader. At this point the leader's set of voters will be (1, UUID1), (2, UUID2) and (3, UUID3) and the set of observers will include (3, UUID3').
This state can be discovered by the operator by using the DescribeQuorum RPC, the Admin client or the kafka-metadata-quorum
CLI. The operator can now decide to add replica (3, UUID3') to the set of voters using the AddVoter RPC, the Admin client or the kafka-metadata-quorum
CLI.
When the AddVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), (3, UUID3) and (3, UUID3'). The operator can remove the failed disk from the voter set by using the RemoveVoter RPC, the Admin client or the kafka-metadata-quorum
CLI.
When the RemoveVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), and (3, UUID3'). At this point the Kafka cluster has successfully recover from a disk failure in a controller node in a consistent way.
Node Failure Recovery
To support this scenario the Kafka cluster must be configured to use controller.quorum.bootstrap.servers
. Let's assume that the voter set is (1, UUID1), (2, UUID2) and (3, UUID3).
At this replica 3 has failed and the Kafka operator would like to replace it. The operator would start a new controller with replica id 4. The node 4's metadata log dir would get formatted, generate and persist the directory id UUID4. Replica 4 will discover the leader by sending Fetch and FetchSnapshot request to the servers enumerated in controller.quorum.bootstrap.servers
. After a successful Fetch RPC, the leader's set of voters will be (1, UUID1), (2, UUID2), (3, UUID3) and set of observers will include (4, UUID4).
The operator can now decide to add replica (4, UUID4) to the set of voters using the AddVoter RPC. When this operation succeeds the set of voters will be (1, UUID1), (2, UUID2), (3, UUID3) and (4, UUID4).
The operator can now decided to remove replica (3, UUID3) from the set of voters using the RemoveVoter RPC.
Reference Explanation
The general goal of this KIP is to allow the user to dynamically change the set of voters (also known as controllers) for the KRaft cluster metadata partition. This is achieved by storing the set of voters and their known endpoints in the log instead of the controller.quorum.voters
properties. Because the set of voters is stored in the log it allows the leader to replicate this information to all of the fetching replicas (voters and observers). Since old records can be deleted once a snapshot has been replicated, the KRaft snapshots will also contain the set of voters up to the included offset.
Following voters will discover the set of voters and their endpoints by fetching the latest log from the leader but how do new voters discovery the leader's endpoint? The leader will push this information to new voters (or voters that were offline for a long time) using the BeginQuorumEpoch request. The active leader sends BeginQuorumEpoch to all of the voters when it becomes leader for an epoch.
The leader doesn't send BeginQuorumEpoch to observers since this are dynamic and are not included in the KRaft partition log. Observer will instead discover the leader using the controller.quorum.bootstrap.servers
. It is important that properties includes at least one of the available voters, else brokers (observers) will not be able to discover the leader of the KRaft cluster metadata partition.
The rest of these section and subsection goes into the detail changes required to implement this new functionality.
Directory id or replica UUID
In addition to a replica ID each replica assigned to a KRaft topic partition will have a replica UUID. This UUID will be generated once and persisted in the meta.properties file for the metadata.log.dir. Replica UUID and directory UUID was first introduced in KIP-858: Handle JBOD broker disk failure in KRaft.
There are two cases when a directory id will be generated and persisted:
- kafka-storage format command will generate a directory id for all of the log directories including the metadata log dir.
- meta.properties exists but it doesn't include a directory.id property. This case will generate and persist a directory id to support upgrade from Kafka versions that don't support this feature to versions that support this feature.
Supported features
This feature can only be enabled if all of the voters (controllers) and observers (brokers) support this feature. This is required mainly because this feature needs two write two new control records (AddVoter and RemoveVoter) to the KRaft cluster metadata partition. All replicas need to be able to read and decode these new records.
There will be a new SupportedFeature added to the ApiVersions response of the Kafka nodes. The name of this new supported feature will be kraft.version
. The default value be 0 and represents that only KIP-595: A Raft Protocol for the Metadata Quorum, KIP-630: Kafka Raft Snapshot and KIP-996: Pre-Vote are supported. Version 1 means that this KIP is supported.
When the clients sends a UpdateFeatures RPC to the active controller, if the FeatureUpdates.Feature property is kraft.version, the associated information will be passed to KRaft client. The KRaft client will implement two different algorithm if the upgrade is supported by voters and observers. For voters the KRaft client will comparing the upgraded version against all of the persisted AddVoter and RemoveVoter records for the KRaft cluster metadata partition. The KRaft client cannot do this for observers (brokers) since their supported versions are not persisted in the log. The controller will instead push the broker registration information to the KRaft client.
Voter Changes
Adding Voters
Voters are added to the cluster metadata partition by sending an AddVoter RPC to the leader. For safety Kafka will only allow one voter change operation at a time. If there are any pending voter change operations the leader will wait for them to finish.
If there are no pending voter change operations the leader send an ApiVersions request to the new voter's endpoint to discover it's kraft.version
support features. If the new leader supports the current kraft.version
, it will write a AddVoterRecord to the log and immediately update its in-memory quorum state to include this voter as part of the quorum. Any replica that replicates and reads this AddVoterRecord will update their in-memory voter set to include this new voter. Voters will not wait for these records to get committed before updating their voter set.
Once the AddVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the AddVoter RPC and process new voter change operations.
Removing Voters
Voter are removed from the cluster metadata partition by sending a RemoveVoter RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the RemoveVoterRecord to the log and immediately update its voter set to the new configuration.
Once the RemoveVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC. If the removed voters is the leader, the leader will resign from the quorum when the RemoveVoterRecord has been committed. To allow this operation to be committed and for the leader to resign the followers will continue to fetch from the leader even if the leader is not part of the new voter set. In KRaft, leader election is triggered when the voter hasn't received a successful response in the fetch timeout.
Bootstrapping
The section describe how the quorum will get bootstrap to support this KIP. There are two configurations that Kafka will support and each will be explain separately. The Kafka cluster can be configured to use the existing controller.quorum.voters
and the new property called controller.quorum.bootstrap.servers
.
controller.quorum.voters
This is a static quorum configuration where all of the voters' ID, host and port are specified. An example value for this configuration is 1@localhost:9092,2@localhost:9093,3@localhost:9094
.
If the kraft.version
is 0, this properties will be used to configure the set of voters for the KRaft cluster metadata partition.
When the kraft.version
is upgraded to a version greater 0 and the version is supported by the voters, the leader will write a control record batch that will include the kraft.version
record and all of the AddVoter records for all of the voters. This will allow KRaft to ignore the controller.quorum.voters properties and instead rely solely on the log state when it is upgraded to kraft.version 1.
controller.quorum.bootstrap.servers
This configuration describe the set of hosts and ports that can be queried to discover the cluster metadata partition leader. Observers and to-be-added voters will send Fetch requests to this list of servers until the leader is discovered.
When using this configuration for a new cluster, the quorum should be started with only one voter. This voter is bootstrapped by running the storage tool format command. This tool will create the cluster metadata partition and append one AddVoterRecord to it. Additional voters can get added by using the AddVoter RPC as described in this KIP.
Leader Election
It is possible for the leader to write an AddVoterRecord to the log and replica it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.
Automatic endpoint and directory id discovery
To improve the usability of this feature it would beneficial for the leader of the KRaft cluster metadata leader to automatically rediscover the voters' endpoints. This makes it possible for the operator to update the endpoint of a voter without having to use the kafka-metadata-quorum
tool. When a voter becomes a follower and discovers a new leader will always send an UpdateVoter RPC to the leader. This request instructs the leader to update the endpoints of the matching replica id and replica uuid. When at voter becomes a leader it will also write an AddVoter record for itself if the endpoints have changed.
The directory id, or replica uuid, will behave differently. The quorum shouldn't automatically update the directory id, since different values means that the disk was replaced. For directory id, the leader will only override it if it was not previously set. This behavior is useful for when a cluster gets upgraded to a kraft.version
greater than 1.
High Watermark
As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermark to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remote replicas.
With this KIP, it is possible for the leader to not be part of the voter set when the replica removed is the leader. In this case the leader will continue to handle Fetch and FetchSnapshot request as normal but it will not count itself when computing the high watermark.
Snapshots
The snapshot generation code needs to be extended to include these new KRaft specific control record for AddVoter. Before this KIP the snapshot didn't include any KRaft generated control records.
Internal Listener
The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.
Public Interfaces
Configuration
There only two configurations for this feature.
controller.quorum.voters
This is an existing configuration. This configuration describes the state of the quorum and will only be used if the kraft.version feature is 0.
controller.quorum.bootstrap.servers
This is a list of nodes that brokers and new controllers can use to discover the quorum leader. Brokers and new controllers (observers) will send Fetch requests to all of the nodes in this configuration until they discover the quorum leader and the Fetch request succeeds. The quorum voters and their configuration will be learned by fetching and reading the records from the log and snapshot. This includes committed and uncommitted records.
If this configuration is specified, observers will not use the controller.quorum.voters
endpoints to discover the leader.
Log and Snapshot Control Records
Two new control records will be added to the log and snapshot of a KRaft partition.
LeaderChangeMessage
Add an optional VoterUuid to Voter. This change is not needed for correctness but it is nice to have for tracing and debugging. The leader will write version 0 if the kraft.version
is 0. The leader will write version 1 if the kraft.version
is 1.
git diff upstream/trunk clients/src/main/resources/common/message/LeaderChangeMessage.json diff --git a/clients/src/main/resources/common/message/LeaderChangeMessage.json b/clients/src/main/resources/common/message/LeaderChangeMessage.json index fdd7733388..2b019a2a80 100644 --- a/clients/src/main/resources/common/message/LeaderChangeMessage.json +++ b/clients/src/main/resources/common/message/LeaderChangeMessage.json @@ -16,7 +16,7 @@ { "type": "data", "name": "LeaderChangeMessage", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ {"name": "Version", "type": "int16", "versions": "0+", @@ -30,7 +30,8 @@ ], "commonStructs": [ { "name": "Voter", "versions": "0+", "fields": [ - {"name": "VoterId", "type": "int32", "versions": "0+"} + { "name": "VoterId", "type": "int32", "versions": "0+" }, + { "name": "VoterUuid", "type": "int32", "versions": "1+" } ]} ] }
AddVoterRecord
A control record for instructing the voters to add a new voter to the topic partition. This record can exist in both the log and the snapshot of a topic partition.
The ControlRecordType
is TBD and will be updated when the code is committed to Kafka.
{ "type": "data", "name": "AddVoterRecord", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Version", "type": "int16", "versions": "0+", "about": "The version of the add voter record" }, { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The replica id of the voter getting added to the topic partition" }, { "name": "VoterUuid", "type": "uuid", "versions": "0+", "about": "The directory id of the voter getting added to the topic partition" }, { "name": "EndPoints", "type": "[]Endpoint", "versions": "0+", "about": "The endpoint that can be used to communicate with the voter", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the endpoint" }, { "name": "Host", "type": "string", "versions": "0+", "about": "The hostname" }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "The port" }, { "name": "SecurityProtocol", "type": "int16", "versions": "0+", "about": "The security protocol" } ]}, { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+", "about": "The range of versions of the protocol that the replica supports", "fields": [ { "name": "MinSupportedVersion", "type": "int16", "versions": "0+", "about": "The minimum supported KRaft protocol version" }, { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+", "about": "The maximum supported KRaft protocol version" } ]} ] }
Handling
KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters an AddVoterRecord it will add the replica ID and UUID to its voter set. If the replica getting added is itself then it will allow the transition to prospective candidate when the fetch timer expires. The fetch timer is reset whenever it receives a successful Fetch or FetchSnapshot response.
RemoveVoterRecord
A control record for instructing the voters to remove a voter from the topic partition. This record can exist in the log but not the snapshot of a topic partition.
The ControlRecordType
is TBD and will be updated when the code is committed to Kafka.
{ "type": "data", "name": "RemoveVoterRecord", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Version", "type": "int16", "versions": "0+", "about": "The version of the remove voter record"}, { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The replica id of the voter getting removed from the topic partition"}, { "name": "VoterUuid", "type": "uuid", "versions": "0+", "about": "The directory id of the voter getting removed from the topic partition"} ] }
Handling
KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters a RemoveVoterRecord it will remove the replica ID and UUID from its voter set. If the replica getting removed is the leader and is the local replica then the replica will stay leader until the RemoveVoterRecord gets committed or the epoch advances losing leadership of the latest epoch.
Quorum State
Each KRaft topic partition has a quorum state (QuorumStateData) that gets persisted in the quorum-state
file in the directory for the topic partition. The following changes will be made to this state:
- ClusterId will get removed in version 1. This field is not used because the cluster id is persisted in
<metadata.log.dir>/meta.properties
. - AppliedOffset will get removed in version 1. This field is not used in version 0. The field will get removed because it implied an inclusive offset. This is not correct because the default value is 0.
- VoterUuid will get added in version 1. The voting replica will persist both the voter ID and UUID of the candidate for which it voted.
- AppliedRecord will get added in version 1. This is the newest record that the KRaft client has read and applied to update the current set of voters. EndOffset is exclusive and represents the next offset that the KRaft client should read if available.
- CurrentVoters will get extended in version 1 to include the voter's directory id or UUID.
Version 0 of this data will get written if the kraft.version
is 0. Version 1 of this data will get written if the kraft.version
is 1.
git diff upstream/trunk raft/src/main/resources/common/message/QuorumStateData.json diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json index d71a32c75d..34881f05ff 100644 --- a/raft/src/main/resources/common/message/QuorumStateData.json +++ b/raft/src/main/resources/common/message/QuorumStateData.json @@ -16,19 +16,25 @@ { "type": "data", "name": "QuorumStateData", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ - {"name": "ClusterId", "type": "string", "versions": "0+"}, - {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "AppliedOffset", "type": "int64", "versions": "0+"}, - {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"} + { "name": "ClusterId", "type": "string", "versions": "0" }, + { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "VotedUuid", "type": "uuid", "versions": "1+" }, + { "name": "AppliedOffset", "type": "int64", "versions": "0" }, + { "name": "AppliedRecord", "type": "AppliedRecord", "versions": "1+", "fields": [ + { "name": "EndOffset", "type": "int64", "versions": "1+" }, + { "name": "Epoch", "type": "int32", "versions": "1+" } + ]}, + { "name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+" } ], "commonStructs": [ { "name": "Voter", "versions": "0+", "fields": [ - {"name": "VoterId", "type": "int32", "versions": "0+"} + { "name": "VoterId", "type": "int32", "versions": "0+" }, + { "name": "VoterUuid", "type": "uuid", "versions": "1+" } ]} ] }
RPCs
AddVoter
This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller, when sent to a broker, the broker will forward the request to the controller.
Handling
When the leader receives an AddVoter request it will do the following:
- Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
- Wait until there are no uncommitted add or remove voter records.
- Wait for the LeaderChangeMessage control record from the current epoch to get committed.
- Send an ApiVersions RPC to the first listener to discover the supported
kraft.version
of the new voter. - Check that the new voter supports the current
kraft.version
. - Append the AddVoterRecord to the log.
- The KRaft internal listener will read this record from the log and add the voter to the voter set.
- Wait for the AddVoterRecord to commit using the majority of new voter set.
- Send the AddVoter response to the client.
In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log, the set of voter changes. If the new voter is too far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.
In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will accept the BeginQuorumEpoch RPC even if it is doesn't believe it is a member of the voter set.
The replica will return the following errors:
- NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
- VOTER_ALREADY_ADDED - when the request contains a replica ID and UUID that is already in the committed voter set.
- UNSUPPORTED_VERSION - when the
kraft.version
is not greater than 1. - INVALID_REQUEST - when the new voter doesn't support the current
kraft.version
.
Request
{ "apiKey": 75, "type": "request", "listeners": ["controller", "broker"], "name": "AddVoterRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+" }, { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The name of the topic" }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID" }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index" }, { "name": "VoterId", "type": "int32", "versions": "0+", "about": "The replica id of the voter getting added to the topic partition" }, { "name": "VoterUuid", "type": "uuid", "versions": "0+", "about": "The directory id of the voter getting added to the topic partition" }, { "name": "Listeners", "type": "[]Listener", "versions": "0+", "about": "The endpoints that can be used to communicate with the voter", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the endpoint" }, { "name": "Host", "type": "string", "versions": "0+", "about": "The hostname" }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "The port" }, { "name": "SecurityProtocol", "type": "int16", "versions": "0+", "about": "The security protocol" } ]} ] }
Response
{ "apiKey": 75, "type": "response", "name": "AddVoterResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error" }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId", "about": "The replica id of the current leader or -1 if the leader is unknown" }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "The latest known leader epoch" } ]}, { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1, "about": "Endpoint for current leader of the topic partition", "fields": [ { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" } ]} ] }
RemoveVoter
This RPC can be sent by an administrative client to remove a voter from the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.
Handling
When the leader receives a RemoveVoter request it will do the following:
- Wait until there are no uncommitted add or remove voter records.
- Wait for the LeaderChangeMessage control record from the current epoch to get committed.
- Append the RemoveVoterRecord to the log.
- The KRaft internal listener will read this record from the log and remove the voter from the voter set.
- Wait for the RemoveVoterRecord to commit using the majority of new configuration.
- Send the RemoveVoter response to the client.
- Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.
In 3. and 4. it is possible for the RemoveVoterRecord would remove the current leader from the voter set. In this case the leader needs to allow Fetch and FetchSnapshot requests from replicas. The leader should not count itself when determining the majority and determining if records have been committed.
The replica will return the following errors:
- NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
- VOTER_ALREADY_REMOVED - when the request contains a replica ID and UUID that is already not in the committed voter set.
- UNSUPPORTED_VERSION - when the
kraft.version
is not greater than 1.
Request
{ "apiKey": 76, "type": "request", "listeners": ["controller", "broker"], "name": "RemoveVoterRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The name of the topic" }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID" }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index" }, { "name": "VoterId", "type": "int32", "versions": "0+", "about": "The replica id of the voter getting removed from the topic partition" }, { "name": "VoterUuid", "type": "uuid", "versions": "0+", "about": "The directory id of the voter getting removed from the topic partition" } ] }
Response
{ "apiKey": 76, "type": "response", "name": "RemoveVoterResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error" }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId", "about": "The replica id of the current leader or -1 if the leader is unknown" }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "The latest known leader epoch" } ]}, { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1, "about": "Endpoint for current leader of the topic partition", "fields": [ { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" } ]} ] }
UpdateVoter
This RPC is different from AddVoter in two ways. It will be sent only by voters to the latest known leader. It include both listener endpoints and kraft.version
information. This is useful to automatically update that information without additional operator intervention.
Handling
When the leader receives an UpdateVoter request it will do the following:
- Wait until there are no uncommitted add or remove voter records.
- Wait for the LeaderChangeMessage control record from the current epoch to get committed.
- Check that the updated voter supports the current
kraft.version
. - Append the updated AddVoterRecord to the log.
- The KRaft internal listener will read this record from the log and update the voter's information. This include updating the endpoint used by the KRaft NetworkClient.
- Wait for the AddVoterRecord to commit using the majority of new voter set.
- Send the UpdateVoter response to the client.
Request
{ "apiKey": 77, "type": "request", "listeners": ["controller"], "name": "UpdateVoterRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+" }, { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The name of the topic" }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID" }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index" }, { "name": "VoterId", "type": "int32", "versions": "0+", "about": "The replica id of the voter getting updated in the topic partition" }, { "name": "VoterUuid", "type": "uuid", "versions": "0+", "about": "The directory id of the voter getting updated in the topic partition" }, { "name": "Listeners", "type": "[]Listener", "versions": "0+", "about": "The endpoint that can be used to communicate with the leader", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the endpoint" }, { "name": "Host", "type": "string", "versions": "0+", "about": "The hostname" }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "The port" }, { "name": "SecurityProtocol", "type": "int16", "versions": "0+", "about": "The security protocol" } ]}, { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+", "about": "The range of versions of the protocol that the replica supports", "fields": [ { "name": "MinSupportedVersion", "type": "int16", "versions": "0+", "about": "The minimum supported KRaft protocol version" }, { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+", "about": "The maximum supported KRaft protocol version" } ]} ] }
Response
{ "apiKey": 77, "type": "response", "name": "UpdateVoterResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error" }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId", "about": "The replica id of the current leader or -1 if the leader is unknown" }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "The latest known leader epoch" } ]}, { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1, "about": "Endpoint for current leader of the topic partition", "fields": [ { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" } ]} ] }
Vote
Handling
Since the set of voters can change and not all replicas know the latest voter set, handling of Vote request needs to be relaxed from what was defined and implemented for KIP-595.
KRaft replicas will accept Vote requests from all replicas. Candidate replicas don't need to be in the voters' voter set to receive a vote. This is needed to be able to elect a leader from the new voter set even though the new voter set hasn't been replicated to all of its voters.
The voter will persist both the candidate ID and UUID in the quorum state so that it only votes for at most one candidate for a given epoch.
The replica will return the following new errors:
- INVALID_REQUEST - when the voter ID and UUID doesn't match the local ID and UUID.
- UNSUPPORTED_VERSION - when a non-empty candidate UUID is specified but the voter doesn't support kraft.version 1.
Request
git diff upstream/trunk clients/src/main/resources/common/message/VoteRequest.json diff --git a/clients/src/main/resources/common/message/VoteRequest.json b/clients/src/main/resources/common/message/VoteRequest.json index 35583a790b..ff808cbafe 100644 --- a/clients/src/main/resources/common/message/VoteRequest.json +++ b/clients/src/main/resources/common/message/VoteRequest.json @@ -18,11 +18,13 @@ "type": "request", "listeners": ["controller"], "name": "VoteRequest", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null"}, + { "name": "VoterId", "type": "int32", "versions": "1+", "ignorable": true, "default": "-1", "entityType": "brokerId", + "about": "The replica id of the voter receiving the request" }, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", @@ -34,14 +36,16 @@ { "name": "CandidateEpoch", "type": "int32", "versions": "0+", "about": "The bumped epoch of the candidate sending the request"}, { "name": "CandidateId", "type": "int32", "versions": "0+", "entityType": "brokerId", - "about": "The ID of the voter sending the request"}, + "about": "The replica id of the voter sending the request"}, + { "name": "CandidateUuid", "type": "uuid", "versions": "1+", + "about": "The directory id of the voter sending the request" }, + { "name": "VoterUuid", "type": "uuid", "versions": "1+", + "about": "The directory id of the voter receiving the request to vote, empty uuid if unknown" }, { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+", "about": "The epoch of the last record written to the metadata log"}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The offset of the last record written to the metadata log"} ]} ]} ] }
Response
git diff upstream/trunk clients/src/main/resources/common/message/VoteResponse.json diff --git a/clients/src/main/resources/common/message/VoteResponse.json b/clients/src/main/resources/common/message/VoteResponse.json index b92d0070c1..21d0aa5312 100644 --- a/clients/src/main/resources/common/message/VoteResponse.json +++ b/clients/src/main/resources/common/message/VoteResponse.json @@ -17,7 +17,7 @@ "apiKey": 52, "type": "response", "name": "VoteResponse", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", @@ -37,9 +37,14 @@ "about": "The latest known leader epoch"}, { "name": "VoteGranted", "type": "bool", "versions": "0+", "about": "True if the vote was granted and false otherwise"} ]} ]}, + { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0, + "about": "Endpoints for all current-leaders enumerated in PartitionData", "fields": [ + { "name": "NodeId", "type": "int32", "versions": "1+", + "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node"}, + { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" }, + { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" } + ]} ] }
UpdateFeatures
TODO: Fill this out
Handling
Request
Response
Fetch
Request
Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.
{ "apiKey": 1, "type": "request", "listeners": ["zkBroker", "broker", "controller"], "name": "FetchRequest", "validVersions": "0-14", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The replica ID of the follower, of -1 if this request is from a consumer." }, ... { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true, "about": "The name of the topic to fetch." }, { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ReplicaUuid", "type": "uuid", "versions": "14+", "nullableVersions": "14+", "default": "null", "about": "The replica generated UUID. null otherwise." }, ... ]} ]} ] }
Response
Version 14 rename LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.
{ "apiKey": 1, "type": "response", "name": "FetchResponse", "validVersions": "0-14", "flexibleVersions": "12+", "fields": [ ... { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "Topic", "type": "string", "versions": "0-12", "ignorable": true, "entityType": "topicName", "about": "The topic name." }, { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The topic partitions.", "fields": [ ... { "name": "CurrentLeader", "type": "CurrentLeader", "versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [ { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType": "brokerId", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1", "about": "The latest known leader epoch"}, { "name": "EndPoint", "type": "Endpoint", "versions": "14+", "about": "The endpoint that can be used to communicate with the leader", "fields": [ { "name": "Host", "type": "string", "versions": "14+", "about": "The hostname." }, { "name": "Port", "type": "uint16", "versions": "14+", "about": "The port." } ]} ]}, ... ]} ]} ] }
Handling
Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.
There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.
When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).
FetchSnapshot
Request
Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.
{ "apiKey": 59, "type": "request", "listeners": ["controller"], "name": "FetchSnapshotRequest", "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0, "about": "The clusterId if known, this is used to validate metadata fetches prior to broker registration" }, { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId", "about": "The replica ID of the follower" }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "about": "The maximum bytes to fetch from all of the snapshots" }, { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+", "about": "The topics to fetch", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The name of the topic to fetch" }, { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+", "about": "The partitions to fetch", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index" }, { "name": "ReplicaUuid", "type": "uuid", "versions": "1+", "default": "null", "about": "The replica UUID of the follower" }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch of the partition, -1 for unknown leader epoch" }, { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+", "about": "The snapshot endOffset and epoch to fetch", "fields": [ { "name": "EndOffset", "type": "int64", "versions": "0+" }, { "name": "Epoch", "type": "int32", "versions": "0+" } ]}, { "name": "Position", "type": "int64", "versions": "0+", "about": "The byte position within the snapshot to start fetching from" } ]} ]} ] }
Response
Version 1 renames LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.
{ "apiKey": 59, "type": "response", "name": "FetchSnapshotResponse", "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false, "about": "The top level response error code." }, { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The name of the topic to fetch." }, { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "Index", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no fetch error." }, { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+", "about": "The snapshot endOffset and epoch fetched", "fields": [ { "name": "EndOffset", "type": "int64", "versions": "0+" }, { "name": "Epoch", "type": "int32", "versions": "0+" } ]}, { "name": "CurrentLeader", "type": "CurrentLeader", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"}, { "name": "EndPoint", "type": "Endpoint", "versions": "1+", "about": "The endpoint that can be used to communicate with the leader", "fields": [ { "name": "Host", "type": "string", "versions": "1+", "about": "The hostname." }, { "name": "Port", "type": "uint16", "versions": "1+", "about": "The port." } ]}, { "name": "Size", "type": "int64", "versions": "0+", "about": "The total size of the snapshot." }, { "name": "Position", "type": "int64", "versions": "0+", "about": "The starting byte position within the snapshot included in the Bytes field." }, { "name": "UnalignedRecords", "type": "records", "versions": "0+", "about": "Snapshot data in records format which may not be aligned on an offset boundary" } ]} ]} ] }
Handling
Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.
There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.
When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.
BeginQuorumEpoch
The version of the response is increase and the fields remain unchanged.
TODO: Talk about the leader sending this request as part of quorum check. This is needed because the voter could come back with a new replica uuid. In that case voter can only discover the leader using Fetch unless the leader sends the begin quorum epoch request.
TODO: The receiver of the RPC always applies the request even if it is not a member of the voter set.
Request
- LeaderId was moved out of the topic partition maps
- VoterId was added to the top level
- VoterUuId was added to PartitionData
- Allow tagged fields for version greater than or equal to 1.
{ "apiKey": 53, "type": "request", "listeners": ["controller"], "name": "BeginQuorumEpochRequest", "validVersions": "0-1", "flexibleVersions": "1+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null"}, { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId", "about": "The ID of the newly elected leader"}, { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId", "about": "The voter ID of the receiving replica." }, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "VoterUuid", "type": "uuid", "versions": "1+", "about": "The replica UUID of the receiving replica." }, { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId", "about": "The ID of the newly elected leader"}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The epoch of the newly elected leader"} ]} ]} ] }
Response
Version 1 is a flexible version and add the tagged field LeaderEndpoint to PartitionData.
{ "apiKey": 53, "type": "response", "name": "BeginQuorumEpochResponse", "validVersions": "0-1", "flexibleVersions": "1+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top level error code."}, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+"}, { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"}, { "name": "LeaderEndPoint", "type": "Endpoint", "versions": "1+", "taggedVersions": "+1", "tag": 0, "about": "The endpoint that can be used to communicate with the leader", "fields": [ { "name": "Host", "type": "string", "versions": "1+", "about": "The hostname." }, { "name": "Port", "type": "uint16", "versions": "1+", "about": "The port." } ]} ]} ]} ] }
Handling
This request will be handle as described in KIP-595 with the following additional errors:
- INVALID_REQUEST - when the voter ID and UUID doesn't match.
- NOT_VOTER - when the the replica is not a voter in the topic partition. This error should be retry by the leader of the topic partition.
EndQuorumEpoch
Request
- LeaderId was moved out of the topic partition maps
- VoterId was added to the request
- VoterUuid was added to the Partitions
- ReplicaUuid was added to PreferredSuccessors
- Allow tagged fields for versions greater than or equal to 1.
{ "apiKey": 54, "type": "request", "listeners": ["controller"], "name": "EndQuorumEpochRequest", "validVersions": "0-1", "flexibleVersions": "1+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null"}, { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId", "about": "The current leader ID that is resigning." }, { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId", "about": "The voter ID of the receiving replica." }, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "VoterUuid", "type": "uuid", "versions": "1+", "about": "The replica UUID of the receiving replica." }, { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId", "about": "The current leader ID that is resigning"}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The current epoch"}, { "name": "PreferredSuccessors", "type": "[]ReplicaInfo", "versions": "0+", "about": "A sorted list of preferred successors to start the election", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" }, { "name": "ReplicaUuid", "type": "uuid", "versions": "1+" } ]} ]} ]} ] }
Response
Version 1 is a flexible version and add the tagged field LeaderEndpoint to PartitionData.
{ "apiKey": 54, "type": "response", "name": "EndQuorumEpochResponse", "validVersions": "0-1", "flexibleVersions": "1+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top level error code."}, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+"}, { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"}, { "name": "LeaderEndPoint", "type": "Endpoint", "versions": "1+", "taggedVersions": "+1", "tag": 0, "about": "The endpoint that can be used to communicate with the leader", "fields": [ { "name": "Host", "type": "string", "versions": "1+", "about": "The hostname." }, { "name": "Port", "type": "uint16", "versions": "1+", "about": "The port." } ]} ]} ]} ] }
Handling
This request will be handle as described in KIP-595 with the following additional errors:
- INVALID_REQUEST - when the voter ID and UUID doesn't match.
- NOT_VOTER - when the the replica is not a voter in the topic partition. This error could be retry by the leader of the topic partition.
DescribeQuorum
The version of the request is increase and the fields remain unchanged.
Response
- Add ReplicaUuid to ReplicaState
{ "apiKey": 55, "type": "response", "name": "DescribeQuorumResponse", "validVersions": "0-2", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top level error code."}, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+"}, { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"}, { "name": "HighWatermark", "type": "int64", "versions": "0+"}, { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" }, { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" } ]} ]} ], "commonStructs": [ { "name": "ReplicaState", "versions": "0+", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" }, { "name": "ReplicaUuid", "type": "uuid", "versions": "2+" }, { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The last known log end offset of the follower or -1 if it is unknown"}, { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1, "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter" }, { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1, "about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter" } ]} ] }
Handling
The handling of the request is the same as that described in KIP-595 with just the additional fields. Clients handling the response can assume that if a ReplicaState include both the ID and the UUID that replica can become a voter if is enumerated in the Observers field.
Admin Client
The Java Admin client will be extended to support the new field in the DescribeQuorum response and the new AddVoter and RemoveVoter RPCs.
Monitoring
NAME | TAGS | TYPE | NOTE |
---|---|---|---|
number-of-voters | type=raft-metrics | gauge | number of voters for the cluster metadata topic partition. |
number-of-observers | type=raft-metrics | guage | number of observer that could be promoted to voters. |
pending-add-voter | type=raft-metrics | guage | 1 if there is a pending add voter operation, 0 otherwise. |
pending-remove-voter | type=raft-metrics | guage | 1 if there is a pending remove voter operation, 0 otherwise. |
TBD | TBD | guage | 1 if a controller node is not a voter for the KRaft cluster metadata partition, 0 otherwise. |
duplicate-voter-ids | type=raft-metrics | gauge | Counts the number of duplicate replica id in the set of voters. |
number-of-offline-voters | type=raft-metrics | gauge | Number of voters with a last Fetch timestamp greater than the Fetch timeout. |
ignored-static-voters | TBD | gauge | 1 if controller.quorum.voter is set and the kraft.version is greater than 0, 0 otherwise. |
Command Line Interface
kafka-metadata-shell.sh
A future KIP will describe how the kafka-metadata-shell tool will be extended to be able to read and display KRaft control records from the quorum, snapshot and log.
kafka-storage.sh
A future KIP describe how the kafka-storage tool will be extended to be able to bootstrap the first quorum node by writing an AddVoterRecord to the cluster metadata log when the controller.quorum.bootstrap.servers configuration is used.
kafka-metadata-quorum.sh
The kafka-metadata-quorum tool described in KIP-595 and KIP-836 will be improved to support these additional commands:
describe
TODO: This command should print all of the voter endpoints that the leader knows about. It should also display if there are any uncommitted add or remove voter changes.
This command will print both the ReplicaId and ReplicaUuid for CurrentVoters. A new row called CouldBeVoters will be added which print the Replica ID and UUID of any replica that could be added to the voter set. E.g.
> bin/kafka-metadata-quorum.sh --describe ClusterId: SomeClusterId LeaderId: 0 LeaderEpoch: 15 HighWatermark: 234130 MaxFollowerLag: 34 MaxFollowerLagTimeMs: 15 CurrentVoters: [{"id": 0, "uuid": "UUID1"}, {"id": 1, "uuid": "UUID2"}, {"id": 2, "uuid": "UUID2"}] Observers: [{"id": 3, "uuid": "UUID3"}]
describe --replication
This command will print on additional column for the replica uuid after the replica id. E.g.
> bin/kafka-metadata-quorum.sh --describe replication ReplicaId ReplicaUuid LogEndOffset ... 0 uuid1 234134 ... ...
add-controller
This command is used to add new voters to the topic partition. The flags --replica-id and --replica-uuid must be specified. A future KIP will describe how the user can specify endpoint information for the replica.
remove-controller
This command is used to remove voters from the topic partition. The flags --replica-id and --replica-uuid must be specified.
Compatibility, Deprecation, and Migration Plan
The features in this KIP will be supported if the ApiVersions of all of the voters and observers is greater than the versions described here. If the leader has a replica UUID for all of the voters then this KIP is supported by all of the voters.
Upgrading to controller. quorum.bootstrap.servers
TODO: figure out a way to do this. The requirement is the AddVoterRecord for all of the voters in the static configuration are committed. How do we guarantee this? Is seeing a committed AddVoterRecord enough?
Test Plan
This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ specification.
Rejected Alternatives
KIP-642: Dynamic quorum reassignment. KIP-642 describe how to perform dynamic reassignment will multiple voters added and removed. KIP-642 doesn't support disk failures and it would be more difficult to implement compared to this KIP-853. In a future KIP we can describe how we can add administrative operations that support the addition and removal of multiple voters.
References
- Ongaro, Diego, and John Ousterhout. "In search of an understandable consensus algorithm." 2014 {USENIX} Annual Technical Conference ({USENIX}{ATC} 14). 2014.
Ongaro, Diego. Consensus: Bridging theory and practice. Diss. Stanford University, 2014.
- Bug in single-server membership changes
- KIP-595: A Raft Protocol for the Metadata Quorum
- KIP-630: Kafka Raft Snapshot
- KIP-631: The Quorum-based Kafka Controller