Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated directory.id, 2) create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control records (kraft.versionRecord and AddVoterRecordKRaftVersionRecord and VotersRecord) to make this Kafka node the only voter for the quorum.

...

This command is similar to the standalone version but the snapshot at 00000000000000000000-0000000000.checkpoint will instead contain an AddVoterRecord a VotersRecord that includes information for all of the voters specified in --controller-quorum-voters. Just like the controller.quorum.voters properties, it is important that the set of voters is equal in all of the controllers with the same cluster id.

...

Voters are added to the cluster metadata partition by sending an AddVoter RPC to the leader. For safety Kafka will only allow one voter change operation at a time. If there are any pending voter change operations the leader will wait for them to finish.

If there are no pending voter change operations the leader send an ApiVersions request to the new voter's endpoint to discover it's kraft.version support features. If the new leader supports the current kraft.version, it will write a AddVoterRecord to the log and immediately update its in-memory quorum state to include this voter as part of the quorum. Any replica that replicates and reads this AddVoterRecord will update their in-memory voter set to include this new voter. Voters will not wait for these records to get committed before updating their voter set.

Once the AddVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the AddVoter RPC and process new voter change operations.

...

Voter are removed from the cluster metadata partition by sending a RemoveVoter RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the RemoveVoterRecord to the log and immediately update its voter set to the new configuration.

Once the RemoveVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC. If the removed voters is the leader, the leader will resign from the quorum when the RemoveVoterRecord has been committed. To allow this operation to be committed and for the leader to resign the followers will continue to fetch from the leader even if the leader is not part of the new voter set. In KRaft, leader election is triggered when the voter hasn't received a successful response in the fetch timeout.

...

When using this configuration for a new cluster, the quorum should be started with only one voter. This voter is bootstrapped by running the storage tool format command. This tool will create the cluster metadata partition and append one AddVoterRecord the VotersRecord to it. Additional voters can get added by using the AddVoter RPC as described in this KIP.

...

It is possible for the leader to write an AddVoterRecord to the log and replica it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.

First Leader

TODO: Talk about the leader will write a VotersRecord is the latest voter set came from the bootstrapping snapshot.

Automatic endpoint and directory id discovery

...

KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters an AddVoterRecord it will add the replica ID and UUID to its voter set. If the replica getting added is itself then it will allow the transition to prospective candidate when the fetch timer expires. The fetch timer is reset whenever it receives a successful Fetch or FetchSnapshot response.

RemoveVoterRecord

...

Code Block
languagejs
{
  "type": "data",
  "name": "RemoveVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the remove voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica id of the voter getting removed from the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting removed from the topic partition" the topic partition"}
  ]
}

Handling

KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters a RemoveVoterRecord it will remove the replica ID and UUID from its voter set. If the replica getting removed is the leader and is the local replica then the replica will stay leader until the RemoveVoterRecord gets committed or the epoch advances losing leadership of the latest epoch.

VotersRecord

A control record for specifying the entire voter set. This record will be written to the generated snapshot. This record will also be written to the log if it has never been written to the log in the past. This semantic is nice to have to consistently replicate the bootstrapping snapshot, at 00000000000000000000-0000000000.checkpoint, of the leader to all of the voters.

The ControlRecordType is TBD and will be updated when the code is committed to Kafka.

Code Block
{
  "type": "data",
  "name": "VotersRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record" },
    { "name": "Voters", "type": "[]Voter", "version": "0+", "fields": [
      { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The replica id of the voter getting added to the topic partition" },
      { "name": "VoterUuid", "type": "uuid", "versions": "0+",
        "about": "The directory id of the voter getting added to the topic partition" },
      { "name": "EndPoints", "type": "[]Endpoint", "versions": "0+",
        "about": "The endpoint that can be used to communicate with the voter", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint" },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname" },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port" },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol" }
      ]},
      { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+",
        "about": "The range of versions of the protocol that the replica supports", "fields": [
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported KRaft protocol version" },
        { "name": "VoterUuidMaxSupportedVersion", "type": "uuidint16", "versions": "0+",
          "about": "The directory id of the voter getting removed from the topic partition"maximum supported KRaft protocol version" }
      ]}
    ]}
  ]
}

Handling

...

Quorum State

Each KRaft topic partition has a quorum state (QuorumStateData) that gets persisted in the quorum-state file in the directory for the topic partition. The following changes will be made to this state:

...

Version 0 of this data will get written if the kraft.version is 0. Version 1 of this data will get written if the kraft.version is 1.

(TODO: there There is one small issue. A candidate at kraft.version 1 may ask for a vote from a voter at kraft.version 0. In this case how will the voter persist the uuid?

TODO: Add endpoint and kraft.version information to the quorum state file.

Code Block
languagejs
git diff upstream/trunk raft/src/main/resources/common/message/QuorumStateData.json
diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json
index d71a32c75d..34881f05ff 100644
--- a/raft/src/main/resources/common/message/QuorumStateData.json
+++ b/raft/src/main/resources/common/message/QuorumStateData.json
@@ -16,19 +16,25 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
-    {"name": "ClusterId", "type": "string", "versions": "0+"},
-    {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "AppliedOffset", "type": "int64", "versions": "0+"},
-    {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"}
+    { "name": "ClusterId", "type": "string", "versions": "0" },
+    { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedUuid", "type": "uuid", "versions": "1+" },
+    { "name": "AppliedOffset", "type": "int64", "versions": "0" },
+    { "name": "AppliedRecord", "type": "AppliedRecord", "versions": "1+", "fields": [
+      { "name": "EndOffset", "type": "int64", "versions": "1+" },
+      { "name": "Epoch", "type": "int32", "versions": "1+" }
+    ]},
+    { "name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+" }
   ],
   "commonStructs": [
     { "name": "Voter", "versions": "0+", "fields": [
-      {"name": "VoterId", "type": "int32", "versions": "0+"}
+      { "name": "VoterId", "type": "int32", "versions": "0+" },
+      { "name": "VoterUuid", "type": "uuid", "versions": "1+" }
     ]}
   ]
 }

...

This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated directory.id, 2) create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control records (KRaftVersionRecord and AddVoterRecord VotersRecord) to make this Kafka node the only voter for the quorum.

...

Similar to the --standalone option this command will 1) create a meta.properties file in metadata.log.dir with the specified directory id (replica-uuid). If the optional directory id is not specified then the command will generate a random directory id. 2) Create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control records (KRaftVersionRecord and AddVoterRecord VotersRecord).

This is option is very unsafe and it is important that the operator uses the same value across all of the voters specified.

...

The value specified in this flag will be used to find the corresponding metadata.version and kraft.version. The --metadata version flag will get deprecated and will be a synonym for --release-software.

kafka-metadata-quorum

The kafka-metadata-quorum tool This tool as described in KIP-595 and KIP-836 will be improved to support these additional commands and options:

describe

This command be extended to print the new information added to the DescribeQuorum RPC. The includes the directory id for all of the replicas (voters and observers). The known endpoints for all of the voters. Any uncommitted voter changes.

...

It is safe for the operator to remove the configuration for controller.qourum.voters when the kraft.version has been upgrade to version 1. All of the Kafka nodes will expose the ignored-static-voter metrics. If all of the Kafka nodes expose a 1 for this metrics, it is safe to remove controller.quorum.voters from the node configuration and specify the controller.quorum.bootstrap.servers instead.

Test Plan

This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ specification.

...