Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
kafka-metadata-quorum --bootstrap-server <endpoints> remove --voter-id <voter-id> --voter-uuid <voter-uuid>

Reference Explanation

...

The general goal of this KIP is to allow the user to dynamically change the set of voters (also known as controllers) for the KRaft cluster metadata partition. This is achieved by storing the set of voters and their known endpoints in the log instead of the controller.quorum.voters properties. Because the set of voters is stored in the log it allows the leader to replicate this information to all of the fetching replicas (voters and observers). Since old records can be deleted once a snapshot has been replicated, the KRaft snapshots will also contain the set of voters up to the included offset.

Following voters will discover the set of voters and their endpoints by fetching the latest log from the leader but how do new voters discovery the leader's endpoint? The leader will push this information to new voters (or voters that were offline for a long time) using the BeginQuorumEpoch request. The active leader sends BeginQuorumEpoch to all of the voters when it becomes leader for an epoch.

The leader doesn't send BeginQuorumEpoch to observers since this are dynamic and are not included in the KRaft partition log. Observer will instead discover the leader using the controller.quorum.bootstrap.servers. It is important that properties includes at least one of the available voters, else brokers (observers) will not be able to discover the leader of the KRaft cluster metadata partition.

The rest of these section and subsection goes into the detail changes required to implement this new functionality. 

Directory id or replica UUID

In addition to a replica ID each replica assigned to a KRaft topic partition will have a replica UUID. This UUID will be generated once and persisted as part of the quorum state for the KRaft topic partition. A replica (controller and broker) may host multiple topic partitions in multiple log directories. The replica UUID will be generated per topic partition hosted by the replica.in the meta.properties file for the metadata.log.dir. Replica UUID and directory UUID was first introduced in KIP-858: Handle JBOD broker disk failure in KRaft.

There are two cases when a replica UUID directory id will be generate:

  1. The topic partition is a new topic partition and doesn't contain a quorum state file persisted on-disk.
  2. The topic partition has a version 0 of the quorum state file. Version 0 of the quorum state file doesn't contain a replica UUID field. The quorum state file will be upgraded to version 1 with the replica UUID persisted.

generated and persisted:

  1. kafka-storage format command will generate a directory id for all of the log directories including the metadata log dir.
  2. meta.properties exists but it doesn't include a directory.id property. This case will generate and persist a directory id to support upgrade from Kafka versions that don't support this feature to versions that support this feature.

Supported features

TODO: Talk about ApiVersions and UpdateFeatures RPCsReplicas in a KRaft topic partition that are voter or can become voters will be identified by their ID and UUID.

Voter Changes

Adding Voters

Voters are added to the cluster metadata partition by sending an AddVoter RPC to the leader. For safety Kafka will only allow one voter change operation at a time. If there are any pending voter change operations the leader will wait for them to finish.

If there are no pending voter change operations the leader the leader send an ApiVersions request to the new voter's endpoint to discover it's KRaftVersion support features. If the new leader supports the current KRaftVersion, it will write a AddVoterRecord to the log and immediately update its in-memory quorum state to include this voter as part of the quorum. Any replica that replicates and reads this AddVoterRecord will update their in-memory voter set to include this new voter. Voters will not wait for these records to get committed before updating their voter set.

...

Once the RemoveVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC. If the removed voters is the leader, the leader will resign from the quorum when the RemoveVoterRecord has been committed. To allow this operation to be committed and for the leader to resign the followers will continue to fetch from the leader even if the leader is not part of the new voter set. In KRaft, leader election is triggered when the voter hasn't received a successful response in the fetch timeout.

...

The section describe how the quorum will get bootstrap to support this KIP. There are two configurations that Kafka will support and each will be explain separately. The Kafka cluster can be configured to use the existing controller.quorum.voters  or and the new property called controller.quorum.bootstrap.servers. These two configurations are mutually exclusive the KRaft cluster is expected to use one or the other but not both.

controller.quorum.voters

This is a static quorum configuration where all of the voters' ID, host and port are specified. An example value for this configuration is 1@localhost:9092,2@localhost:9093,3@localhost:9094 .

When this option is used the leader of the KRaft topic partition will not allow the AddVoter RPCs to add replica IDs that are not describe in the configuration and it would not allow the RemoveVoter RPCs to completely remove replica IDs that are described in the configuration. This means that the replica UUID can change by using sequence of AddVoter and RemoveVoter but the client cannot add new replica id or remove replica id from the set of voters.

When the leader of the topic partition first discovers the UUID of all the voters through the Fetch and FetchSnapshot RPCs it will write a AddVoterRecord to the log for each of the voters described in the configuration.

The KRaft leader will not perform writes from the state machine (active controller) or client until is has written to the log an AddVoterRecord for every replica id in the controller.quorum.voters  configuration.

If the KRaftVersion is 0, this properties will be used to configure the set of voters for the KRaft cluster metadata partition.

When the KRaftVersion is upgraded to a version greater 0 and the version is supported by the voters, the leader will write a control record batch that will include the KRaftVersion record and all of the AddVoter records for all of the voters. This will allow KRaft to ignore the controller.quorum.voters properties and instead rely solely on the log state when it is upgraded to KRaftVersion 1If the cluster metadata topic partition contains AddVoterRecords for replica ids that do not enumerated in controller.quorum.voters then the replica will faile to start and shutdown.

controller.quorum.bootstrap.servers

...

When using this configuration for a new cluster, the quorum will should be started with only one voter. This voter is bootstrapped by running the kafka-storage .sh tool format command. This tool will create the cluster metadata partition and append one AddVoterRecord to it. Additional voters can get added by using the AddVoter RPC as described in this KIP.

...

It is possible for the leader to write an AddVoterRecord to the log and replica it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.

Automatic endpoint and directory id discovery

TODO: Talk about when voters will send AddVoter request to the active leader.


High Watermark

As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermakr watermark to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remove remote replicas.

With this KIP, it is possible for the leader to not be part of the voter set when the replica removed is the leader. In this case the leader will continue to handle Fetch and FetchSnapshot request as normal but it will not count itself when computing the high watermark.

...

The snapshot generation code needs to be extended to include these new KRaft specific control records for AddVoterRecord AddVoter and RemoveVoterRecordRemoveVoter. Before this KIP the snapshot didn't include any KRaft generated control records.

...

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

Common Scenarios

TODO: Think about moving this to the User explanation section.

To better illustrate this feature this section describes two common scenarios that the Kafka administrator may perform.

...

Compatibility, Deprecation, and Migration Plan

The features in this KIP will be supported if the ApiVersions of all of the voters and observers is greater than the versions described here. If the leader has a replica UUID for all of the voters then this KIP is supported by all of the voters.

Upgrading to controller. quorum.bootstrap.servers

TODO: figure out a way to do this. The requirement is the AddVoterRecord for all of the voters in the static configuration are committed. How do we guarantee this? Is seeing a committed AddVoterRecord enough?

...