Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Version 0 of this data will get written if the kraft.version is 0. Version 1 of this data will get written if the kraft.version is 1. (TODO: there is one small issue. A candidate at kraft.version 1 may ask for a vote from a voter at kraft.version 0. In this case how will the voter persist the uuid?) 

Code Block
languagejs
git diff upstream/trunk raft/src/main/resources/common/message/QuorumStateData.json
diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json
index d71a32c75d..34881f05ff 100644
--- a/raft/src/main/resources/common/message/QuorumStateData.json
+++ b/raft/src/main/resources/common/message/QuorumStateData.json
@@ -16,19 +16,25 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
-    {"name": "ClusterId", "type": "string", "versions": "0+"},
-    {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "AppliedOffset", "type": "int64", "versions": "0+"},
-    {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"}
+    { "name": "ClusterId", "type": "string", "versions": "0" },
+    { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedUuid", "type": "uuid", "versions": "1+" },
+    { "name": "AppliedOffset", "type": "int64", "versions": "0" },
+    { "name": "AppliedRecord", "type": "AppliedRecord", "versions": "1+", "fields": [
+      { "name": "EndOffset", "type": "int64", "versions": "1+" },
+      { "name": "Epoch", "type": "int32", "versions": "1+" }
+    ]},
+    { "name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+" }
   ],
   "commonStructs": [
     { "name": "Voter", "versions": "0+", "fields": [
-      {"name": "VoterId", "type": "int32", "versions": "0+"}
+      { "name": "VoterId", "type": "int32", "versions": "0+" },
+      { "name": "VoterUuid", "type": "uuid", "versions": "1+" }
     ]}
   ]
 }

...

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait until there are no uncommitted add or remove voter recordsrecord.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  4. Send an ApiVersions RPC to the first listener to discover the supported kraft.version of the new voter.
  5. Check that the new voter supports the current kraft.version.
  6. Append the AddVoterRecord to the log.
  7. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  8. Wait for the AddVoterRecord to commit using the majority of new voter set.
  9. Send the AddVoter response to the client.

In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log, the set of voter changes. If the new voter is too far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 3., the leader will wait for its current epoch to commit by waiting for the LeaderChangeMessage to commit. This is required to guarantee that two competing voter set, the one from a previous leader and the one from the current leader, only differ by at most one voter. Waiting for the current epoch to commit means that there cannot be some other competing voter set from another leader that can later override this leader's new voter set. See bug in single-server membership changes for more details on this.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will accept the BeginQuorumEpoch RPC even if it is doesn't believe it is a member of the voter set. 

...

DescribeQuorum

TODO: Fix this RPC

TODO: Include the listeners/endpoints for all of the voters known by the leader.

The version of the The version of the request is increase and the fields remain unchanged.

...

NAMETAGSTYPENOTE
number-of-voterstype=raft-metricsgaugenumber of voters for the cluster metadata topic partition.
number-of-observerstype=raft-metricsguagenumber of observer that could be promoted to voters.
pending-add-votertype=raft-metricsguage

1 if there is a pending add voter operation, 0 otherwise.

pending-remove-votertype=raft-metricsguage1 if there is a pending remove voter operation, 0 otherwise.
TBDTBDguage

1 if a controller node is not a voter for the KRaft cluster metadata partition, 0 otherwise.

duplicate-voter-idstype=raft-metricsgauge

Counts the number of duplicate replica id in the set of voters.

number-of-offline-voterstype=raft-metricsgauge

Number of voters with a last Fetch timestamp greater than the Fetch timeout.

ignored-static-votersTBDgauge

1 if controller.quorum.voter is set and the kraft.version is greater than 0, 0 otherwise.

Command Line Interface

kafka-metadata-shell

...

A future KIP will describe how the kafka-metadata-shell tool will be extended to be able to read and display KRaft control records from the quorum, snapshot and log.

kafka-storage

...

The format command will get extended as follow.

--standalone

This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated directory.id, 2) create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control records (KRaftVersionRecord and AddVoterRecord) to make this Kafka node the only voter for the quorum.

This option is unsafe because it doesn’t use the quorum to establish the new quorum. This will be documented in the –standalone option help description.

--metadata-quorum

The value to this option will have the follow schema <replica-id>[-<replica-uuid>]@<host>:<port>. Logically, this function very similar to the controller.quorum.voters. The important difference is that it will optionally support the user directly specifying the replica's directory id.

Similar to the --standalone option this command will 1) create a meta.properties file in metadata.log.dir with the specified directory id (replica-uuid). If the optional directory id is not specified then the command will generate a random directory id. 2) Create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control records (KRaftVersionRecord and AddVoterRecord).

This is option is very unsafe and it is important that the operator uses the same value across all of the voters specified.

kafka-features

The upgrade and downgrade command will support a new configuration flag. A downgrade that results in the decrease of the kraft.version will be rejected by the KRaft leader.

--release-software

The value specified in this flag will be used to find the corresponding metadata.version and kraft.version. The --metadata version flag will get deprecated and will be a synonym for --release-software.

kafka-metadata-quorum

The kafka-metadata-quorum tool described in KIP-595 and KIP-836 will be improved to support these additional commands:

describe

This command be extended to print the new information added to the DescribeQuorum RPC. The includes the directory id for all of the replicas (voters and observers). The known endpoints for all of the voters. Any uncommitted voter changes.

Code Block
kafka-metadata-quorum describe --status

...

TODO: Describe these changes

kafka-metadata-quorum.sh

The kafka-metadata-quorum tool described in KIP-595 and KIP-836 will be improved to support these additional commands:

describe

TODO: This command should print all of the voter endpoints that the leader knows about. It should also display if there are any uncommitted add or remove voter changes.

This command will print both the ReplicaId and ReplicaUuid for CurrentVoters. A new row called CouldBeVoters will be added which print the Replica ID and UUID of any replica that could be added to the voter set. E.g.

Code Block
> bin/kafka-metadata-quorum.sh describe
ClusterId:                SomeClusterId
LeaderId:                 0
LeaderEpoch:              15
HighWatermark:            234130
MaxFollowerLag:           34
MaxFollowerLagTimeMs:     15
CurrentVoters:            [{"id": 0, "uuid": "UUID1"},", "endpoints": ["host:port"]}, {"id": 1, "uuid": "UUID2", "endpoints": ["host:port"]}, {"id": 2, "uuid": "UUID2", "endpoints": ["host:port"]}]
Observers:                [{"id": 3, "uuid": "UUID3"}]
UncommittedAddedVoter:    {"id": 12, "uuid": "UUID2"}, {"idendpoints": 2, "uuid": "UUID2"}]
Observers:              [["host:port"]}
UncommittedRemovedVoter:  {"id": 32, "uuid": "UUID2", "endpoints": ["UUID3host:port"]}]

describe --replication

This command will print on additional column for the replica uuid after the replica id. E.g.

Code Block
> bin/kafka-metadata-quorum.sh --describe replication
ReplicaId   ReplicaUuid   LogEndOffset   ...
0           uuid1         234134         ...
...

add-controller --config <server.properties>

This command is used use to add new voters controllers to the topic KRaft cluster metadata partition. The flags --replica-id and --replica-uuid must be specified. A future KIP will describe how the user can specify endpoint information for the replicaThis command must be executed using the server configuration of the new controller. The command will read the server properties file to read the replica id, the endpoints, and the meta.properties for the directory id.

remove-controller --controller-id <controller-id> --controller-uuid <controller-uuid>

This command is used to remove voters from the topic KRaft cluster metadata partition. The flags --replicacontroller-id and --replicacontroller-uuid must be specified.

Compatibility, Deprecation, and Migration Plan

...