Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This command will parse the release-version to the matching metadata.version and kraft.version features. It will send a UpdateFeatures request to a node with both features set to the matching version. KRaft will write the KRaftVersionRecord control record, if all of the controllers and brokers support the new version. KRaft will use the information in the controller registration, broker registration and add voter records to determine if the new version is compatible.

Add

...

controller

To increase the number of controller the user needs to format a controller node, start the controller node and add that node to the KRaft cluster metadata partition. Formatting a controller node can be done with the following CLI command:

...

Code Block
kafka-metadata-quorum --bootstrap-server <endpoints> add-controller --config controller.properties

Remove

...

controller

To decrease the number of controllers the user needs to execute the following CLI command:

Code Block
kafka-metadata-quorum --bootstrap-server <endpoints> remove-controller --controller-id <replica-id> --controller-uuid <replica-uuid>

Common

...

scenarios

To better illustrate this feature this section describes two common scenarios that the Kafka administrator may perform.

Disk

...

failure recovery

If one of the replicas encounter a disk failure the operator can replace this disk with a new disk and start the replica.

...

When the RemoveVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), and (3, UUID3'). At this point the Kafka cluster has successfully recover from a disk failure in a controller node in a consistent way.

Node

...

failure recovery

Let's assume that the voter set is (1, UUID1), (2, UUID2) and (3, UUID3).

...

The operator can now decided to remove replica (3, UUID3) from the set of voters using the RemoveVoter RPC.

Reference

...

explanation

The general goal of this KIP is to allow the user to dynamically change the set of voters (also known as controllers) for the KRaft cluster metadata partition. This is achieved by storing the set of voters and their known endpoints in the log instead of the controller.quorum.voters properties. Because the set of voters is stored in the log it allows the leader to replicate this information to all of the fetching replicas (voters and observers). Since old log segments (records) can be deleted once a snapshot has been created, the KRaft snapshots will also contain the set of voters up to the included offset.

...

When the clients sends a UpdateFeatures RPC to the active controller, if the FeatureUpdates.Feature property is kraft.version, the associated information will be passed to KRaft client. The KRaft client will implement two different algorithms to check if the upgrade is supported by voters and observers. For voters the KRaft client will comparing the upgraded version against all of the persisted voter set for the KRaft cluster metadata partition. The KRaft client cannot do this for observers (brokers) since their supported versions are not persisted in the log. The controller will instead push the broker registration information to the KRaft client.

Voter

...

changes

Adding

...

voters

Voters are added to the cluster metadata partition by sending an AddVoter RPC to the leader. For safety Kafka will only allow one voter change operation at a time. If there are any pending voter change operations the leader will wait for them to finish.

...

Once the VotersRecord operation has been committed by the majority of the new voter set, the leader can respond to the AddVoter RPC and process new voter change operations.

Removing

...

voters

Voter are removed from the cluster metadata partition by sending a RemoveVoter RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the VotersRecord to the log, the the current voters set minus the voter getting removed, and immediately update its voter set to the new configuration.

...

When using this configuration for a new cluster, the quorum should be started with only one voter. This voter is bootstrapped by running the storage tool format command. This tool will create the cluster metadata partition and append the VotersRecord to it. Additional voters can get added by using the AddVoter RPC as described in this KIP.

Leader

...

election

It is possible for the leader to add a new voter to the voters set, write the VotersRecord to the log and only replicate it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.

First

...

leader

When a KRaft voter becomes leader it will write a KRaftVersionRecord and VotersRecord to the log if the log or the latest snapshot doesn't contain any VotersRecord. This is done to make sure that the voter set in the bootstrap snapshot gets replicated to all of the voters and to not rely on all of the voters being configured with the same bootstrapped voter set.

...

The directory id, or replica uuid, will behave differently. The quorum shouldn't automatically update the directory id, since different values means that the disk was replaced. For directory id, the leader will only override it if it was not previously set. This behavior is useful for when a cluster gets upgraded to a kraft.version greater than 0.

High

...

watermark

As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermark to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remote replicas.

...

To efficiently implement this feature the KRaft implement we keep track of all voters sets between the latest snapshot and the LEO. The replicas will update these order list of voters set whenever the latest snapshot id increases, a VotersRecord control record is read from the log and the log is truncated.

Internal

...

listener

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP -595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and revertedrequires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

Controller auto joinning

To make it easier for users to operate a KRaft controller cluster, they have KRaft controller auto join the cluster voter set by setting the controller.quorum.auto.join.enabled to true. In this case, the controller will remove any voter in the voters set that matches its replica id but doesn't match its directory id (replica uuid) by sending the RemoveVoter RPC. Once the controller has remove all duplicate replica uuid it will add itself to the voters set by sending a AddVoter RPC to the leader, if its replica id and replica uuid is not in the voters set. The TimoutMs for the AddVoter RPC will be set to 10 minutes.

For an example, imagine that the user is trying to create KRaft controller quorum with 3 voters (1, 2, 3).

On controller 1 the user runs:

Code Block
kafka-storage format --cluster-id ABC --release-version "3.8" --standalone --config ...

and starts the controller. When controller 1 starts it sees that is already in the voters set so it doesn't perform any AddVoter and RemoveVoter RPC. Controller 1 eventually becomes leader.

On controller 2 and 3 the user runs:

Code Block
kafka-storage format --cluster-id ABC --release-version "3.8" --config ...

and starts the controllers. Noticed that neither --standalone or --controller-quorum-voters is used for controller 2 and 3 so the controllers start as observers. These controllers will discover the leader using controller.quorum.bootstrap.servers and will use the RemoveVoter and AddVoter RPC as describe in the beginning of these section.

Public Interfaces

Configuration

...

If this configuration is specified, observers will not use the controller.quorum.voters endpoints to discover the leader. 

...

controller.quorum.auto.join.enabled

Controls whether a KRaft controller should automatically join the cluster metadata partition for its cluster id. If the configuration is set to true the controller must be stopped before removing the controller with kafka-metadata-quorum remove-controller. The default value is false.

Log and snapshot control records

A few new control records will be added to the log and snapshot of a KRaft partition.

...

If the local replica is getting added to the voter set, it will allow the transition to prospective candidate when the fetch timer expires. The fetch timer is reset whenever it receives a successful Fetch or FetchSnapshot response.

Quorum

...

state

Each KRaft topic partition has a quorum state (QuorumStateData) that gets persisted in the quorum-state file in the directory for the topic partition. The following changes will be made to this state:

...

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/DescribeQuorumResponse.json
diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
index 0ea6271238..e2481dff04 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
@@ -18,7 +18,7 @@
   "type": "response",
   "name": "DescribeQuorumResponse",
   // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
@@ -40,10 +40,25 @@
         { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
         { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
       ]}
-    ]}],
+    ]},
+    { "name": "Nodes", "type": "[]Node", "versions": "2+", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "2+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+      { "name": "Listeners", "type": "[]Listener",
+        "about": "The listeners of this controller", "versions": "0+", "fields": [
+        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+          "about": "The name of the endpoint" },
+        { "name": "Host", "type": "string", "versions": "0+",
+          "about": "The hostname" },
+        { "name": "Port", "type": "uint16", "versions": "0+",
+          "about": "The port" }
+      ]}
+    ]}
+  ],
   "commonStructs": [
     { "name": "ReplicaState", "versions": "0+", "fields": [
       { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
+      { "name": "ReplicaUuid", "type": "uuid", "versions": "2+" },
       { "name": "LogEndOffset", "type": "int64", "versions": "0+",
         "about": "The last known log end offset of the follower or -1 if it is unknown"},
       { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,

Admin

...

client

The Java Admin client will be extended to support the new field in the DescribeQuorum response and the new AddVoter and RemoveVoter RPCs.

...

NAMETAGSTYPENOTE
number-of-voterstype=raft-metricsgaugenumber of voters for the cluster metadata topic partition.
number-of-observerstype=raft-metricsguagenumber of observer that could be promoted to voters.
pending-add-votertype=raft-metricsguage

1 if there is a pending add voter operation, 0 otherwise.

pending-remove-votertype=raft-metricsguage1 if there is a pending remove voter operation, 0 otherwise.
TBDTBDguage

1 if a controller node is not a voter for the KRaft cluster metadata partition, 0 otherwise.

duplicate-voter-idstype=raft-metricsgauge

Counts the number of duplicate replica id in the set of voters.

number-of-offline-voterstype=raft-metricsgauge

Number of voters with a last Fetch timestamp greater than the Fetch timeout.

ignored-static-votersTBDgauge

1 if controller.quorum.voter is set and the kraft.version is greater than 0, 0 otherwise.

Command

...

line interface

kafka-metadata-shell

A future KIP will describe how the kafka-metadata-shell tool will be extended to be able to read and display KRaft control records from the quorum, snapshot and log.

...

This tool as described in KIP-595 and KIP-836 will be improved to support these additional commands and options:

describe --status

This command be extended to print the new information added to the DescribeQuorum RPC. The includes the directory id for all of the replicas (voters and observers). The known endpoints for all of the voters. Any uncommitted voter changes.

...

This command is used to remove voters from the KRaft cluster metadata partition. The flags --controller-id and --controller-uuid must be specified.

Compatibility,

...

deprecation, and

...

migration plan

The features in this KIP will be supported if the ApiVersions of all of the voters and observers is greater than the versions described here. If the leader has a replica UUID for all of the voters then this KIP is supported by all of the voters.

...

It is safe for the operator to remove the configuration for controller.qourum.voters when the kraft.version has been upgrade to version 1. All of the Kafka nodes will expose the ignored-static-voter metrics. If all of the Kafka nodes expose a 1 for this metrics, it is safe to remove controller.quorum.voters from the node configuration and specify the controller.quorum.bootstrap.servers instead.

Test

...

plan

This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ specification.

Rejected

...

alternatives

KIP-642: Dynamic quorum reassignment. KIP-642 describe how to perform dynamic reassignment will multiple voters added and removed. KIP-642 doesn't support disk failures and it would be more difficult to implement compared to this KIP-853. In a future KIP we can describe how we can add administrative operations that support the addition and removal of multiple voters.

...