Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This feature can only be enabled if all of the voters (controllers) and observers (brokers) support this feature. This is required mainly because this feature needs to write a new control records record (AddVoterRecord, RemoveVoterRecord, VotersRecord) to the KRaft cluster metadata partition. All replicas need to be able to read and decode these new recordscontrol record.

There will be a new SupportedFeature added to the ApiVersions response of the Kafka nodes. The name of this new supported feature will be kraft.version. The default value be 0 and represents that only KIP-595: A Raft Protocol for the Metadata QuorumKIP-630: Kafka Raft Snapshot and KIP-996: Pre-Vote are supported. Version 1 means that this KIP is supported.

...

If there are no pending voter change operations the leader send an ApiVersions request to the new voter's endpoint to discover it's kraft.version support features. If the new leader supports the current kraft.version, it will write a AddVoterRecord VotersRecord to the log, with the current voters set plus the voter getting added, and immediately update its in-memory quorum state to include this voter as part of the quorum. Any replica that replicates and reads this AddVoterRecord VotersRecord will update their in-memory voter set to include this new voter. Voters will not wait for these records to get committed before updating their voter set.

Once the AddVoterRecord VotersRecord operation has been committed by the majority of the new voter set, the leader can respond to the AddVoter RPC and process new voter change operations.

...

Voter are removed from the cluster metadata partition by sending a RemoveVoter RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the RemoveVoterRecord VotersRecord to the log, the the current voters set minus the voter getting removed, and immediately update its voter set to the new configuration.

Once the RemoveVoterRecord VotersRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC. If the removed voters is the leader, the leader will resign from the quorum when the RemoveVoterRecord VotersRecord has been committed. To allow this operation to be committed and for the leader to resign the followers will continue to fetch from the leader even if the leader is not part of the new voter set. In KRaft, leader election is triggered when the voter hasn't received a successful response in the fetch timeout.

...

When the kraft.version is upgraded to a version greater 0 and the version is supported by the voters, the leader will write a control record batch that will include the kraft.version record and all of the AddVoter records for VotersRecord control record with all of the voters specified. This will allow KRaft to ignore the controller.quorum.voters properties and instead rely solely on the log state when it is upgraded to kraft.version 1.

...

It is possible for the leader to add a new voter to write an AddVoterRecord the voters set, write the VotersRecord to the log and replica only replicate it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.

...

When a KRaft voter becomes leader it will write a KRaftVersionRecord and VotersRecord to the log if the log or the latest snapshot doesn't contain any VotersRecord. This is done to make sure that the voter set in the bootstrap snapshot gets replicated to all of the voters and to not rely on all of the voters being configured with the same bootstrapped voter set.

...

To improve the usability of this feature it would beneficial for the leader of the KRaft cluster metadata leader to automatically rediscover the voters' endpoints. This makes it possible for the operator to update the endpoint of a voter without having to use the kafka-metadata-quorum tool. When a voter becomes a follower and discovers a new leader, it will always send an UpdateVoter RPC to the leader. This request instructs the leader to update the endpoints of the matching replica id and replica uuid. When at voter becomes a leader it will also write an AddVoter record for itself if the endpoints have changedVotersRecord controler record with the updated endpoints and kraft.version feature.

The directory id, or replica uuid, will behave differently. The quorum shouldn't automatically update the directory id, since different values means that the disk was replaced. For directory id, the leader will only override it if it was not previously set. This behavior is useful for when a cluster gets upgraded to a kraft.version greater than 10.

High Watermark

As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermark to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remote replicas.

...

The snapshot generation code needs to be extended to include these new KRaft specific control record for AddVoterVotersRecord. Before this KIP the snapshot didn't include any KRaft generated control records.

Internal Listener

When the state machine (controller or broker) calls RaftClient.createSnapshot, the KRaft client will write the SnapshotHeaderRecord, KRaftVersionRecord and VotersRecord controller records at the beginning of the snapshot.

Note that the current KRaft implementation already writes the SnapshotHeaderRecord. The new KRaft implementation will write KRraftVersionRecord and VotersRecord, if the kraft.version is greater than 0.

To efficiently implement this feature the KRaft implement we keep track of all voters sets between the latest snapshot and the LEO. The replicas will update these order list of voters set whenever the latest snapshot id increases, a VotersRecord control record is read from the log and the log is truncated.

Internal Listener

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

...

Control record for recording the latest KRaft protocol version. The KRaft protocol version is used to determine which version of LeaderChangeMessage to write. It is also used to determine if AddVoterRecord, RemoveVoterRecord and VoterRecord the VotersRecord control record can be written to the topic partition.

...

Code Block
{
  "type": "data",
  "name": "KRaftVersionRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the kraft version record" },
    { "name": "KRaftVersion", "type": "int16", "versions": "0+",
      "about": "The kraft protocol version" }
  ]
}

LeaderChangeMessage

Handling

When KRaft replicas read this record from the log they will update their finalized kraft.version once the HWM shows this record as committed.

LeaderChangeMessage

Add an optional VoterUuid to Voter. This Add an optional VoterUuid to Voter. This change is not needed for correctness but it is nice to have for tracing and debugging. The leader will write version 0 if the kraft.version is 0. The leader will write version 1 if the kraft.version is 1.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/LeaderChangeMessage.json
diff --git a/clients/src/main/resources/common/message/LeaderChangeMessage.json b/clients/src/main/resources/common/message/LeaderChangeMessage.json
index fdd7733388..2b019a2a80 100644
--- a/clients/src/main/resources/common/message/LeaderChangeMessage.json
+++ b/clients/src/main/resources/common/message/LeaderChangeMessage.json
@@ -16,7 +16,7 @@
 {
   "type": "data",
   "name": "LeaderChangeMessage",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     {"name": "Version", "type": "int16", "versions": "0+",
@@ -30,7 +30,8 @@
   ],
   "commonStructs": [
     { "name": "Voter", "versions": "0+", "fields": [
-      {"name": "VoterId", "type": "int32", "versions": "0+"}
+      { "name": "VoterId", "type": "int32", "versions": "0+" },
+      { "name": "VoterUuid", "type": "int32uuid", "versions": "1+" }
     ]}
   ]
 } 

...

VotersRecord

A control record for instructing specifying the voters to add a new voter to the topic partitionlatest and entire voters set. This record can exist in both the log and the snapshot of a topic partition.

The ControlRecordType is TBD and will be updated when the code is committed to Kafka.

will be written to the generated snapshot and will specify the voters set at the snapshot id's end offset.

This record will also be written to the log by the leader when handling the AddVoter, RemoveVoter and UpdateVoter RPCs. When handling these RPCs the leader will make sure that the difference in voters is at most one addition or one removal. This is needed to satisfy the invariant that voter changes are committed by the old voter set and new voter set.

This record will also be written by the leader to the log if it has never been written to the log in the past. This semantic is nice to have to consistently replicate the bootstrapping snapshot, at 00000000000000000000-0000000000.checkpoint, at the leader to all of the voters.

The ControlRecordType is TBD and will be updated when the code is committed to Kafka.

Code Block
{
  "type": "
Code Block
languagejs
{
  "type": "data",
  "name": "AddVoterRecordVotersRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the addvoters voter record" },
    { "name": "Voters", "type": "[]Voter", "version": "0+", "fields": [
      { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The replica id of the voter getting added toin the topic partition" },
      { "name": "VoterUuid", "type": "uuid", "versions": "0+",
        "about": "The directory id of the voter gettingin added to the topic partition" },
      { "name": "EndPoints", "type": "[]Endpoint", "versions": "0+",
        "about": "The endpoint that can be used to communicate with the voter", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint" },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname" },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port" },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol" }
      ]},
      { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+",
        "about": "The range of versions of the protocol that the replica supports", "fields": [
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported KRaft protocol version" },
      {  { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported KRaft protocol version" }
      ]}
    ]}
  ]
}

Handling

KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters an AddVoterRecord a VotersRecord it will add the replica ID and UUID to its voter set. If the replica getting added is itself then replace the current voter set.

If the local replica is the leader and it is getting removed, the replica will stay leader until the VotersRecord gets committed or the epoch advances (which forces it to lose leadership).

If the local replica is getting added to the voter set, it will allow the transition to prospective candidate when the fetch timer expires. The fetch timer is reset whenever it receives a successful Fetch or FetchSnapshot response.

RemoveVoterRecord

A control record for instructing the voters to remove a voter from the topic partition. This record can exist in the log but not the snapshot of a topic partition.

The ControlRecordType is TBD and will be updated when the code is committed to Kafka.

Code Block
languagejs
{
  "type": "data",
  "name": "RemoveVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the remove voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica id of the voter getting removed from the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting removed from the topic partition"}
  ]
}

Handling

KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters a RemoveVoterRecord it will remove the replica ID and UUID from its voter set. If the replica getting removed is the leader and is the local replica then the replica will stay leader until the RemoveVoterRecord gets committed or the epoch advances losing leadership of the latest epoch.

VotersRecord

A control record for specifying the entire voter set. This record will be written to the generated snapshot. This record will also be written to the log if it has never been written to the log in the past. This semantic is nice to have to consistently replicate the bootstrapping snapshot, at 00000000000000000000-0000000000.checkpoint, of the leader to all of the voters.

The ControlRecordType is TBD and will be updated when the code is committed to Kafka.

Code Block
{
  "type": "data",
  "name": "VotersRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the voters record" },
    { "name": "Voters", "type": "[]Voter", "version": "0+", "fields": [
      { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The replica id of the voter in the topic partition" },
      { "name": "VoterUuid", "type": "uuid", "versions": "0+",
        "about": "The directory id of the voter in the topic partition" },
      { "name": "EndPoints", "type": "[]Endpoint", "versions": "0+",
        "about": "The endpoint that can be used to communicate with the voter", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint" },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname" },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port" },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol" }
      ]},
      { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+",
        "about": "The range of versions of the protocol that the replica supports", "fields": [
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported KRaft protocol version" },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported KRaft protocol version" }
      ]}
    ]}
  ]
}

Quorum State

Each KRaft topic partition has a quorum state (QuorumStateData) that gets persisted in the quorum-state file in the directory for the topic partition. The following changes will be made to this state:

Quorum State

Each KRaft topic partition has a quorum state (QuorumStateData) that gets persisted in the quorum-state file in the directory for the topic partition. The following changes will be made to this state:

  1. ClusterId will get removed in version 1. This field is not used because the cluster id is persisted in <metadata.log.dir>/meta.properties.
  2. AppliedOffset
  3. ClusterId will get removed in version 1. This field is not used because the cluster id is persisted in <metadata.log.dir>/meta.properties.
  4. AppliedOffset will get removed in version 1. This field is not used in version 0. The field will get removed because it implied an inclusive offset. This is not correct because the default value is 0.
  5. VoterUuid will get added in version 1. The voting replica will persist both the voter ID and UUID of the candidate for which it voted.
  6. AppliedRecord will get added in version 1. This is the newest record that the KRaft client has read and applied to update the current set of voters. EndOffset is exclusive and represents the next offset that the KRaft client should read if available.
  7. CurrentVoters will get extended in version 1 to include the voter's directory id or UUID.

...

Code Block
languagejs
git diff upstream/trunk raft/src/main/resources/common/message/QuorumStateData.json
diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json
index d71a32c75d..70ff2d6f42 100644
--- a/raft/src/main/resources/common/message/QuorumStateData.json
+++ b/raft/src/main/resources/common/message/QuorumStateData.json
@@ -16,19 +16,43 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
-    {"name": "ClusterId", "type": "string", "versions": "0+"},
-    {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "AppliedOffset", "type": "int64", "versions": "0+"},
-    {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"}
+    { "name": "ClusterId", "type": "string", "versions": "0" },
+    { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedUuid", "type": "uuid", "versions": "1+" },
+    { "name": "AppliedOffset", "type": "int64", "versions": "0" },
+    { "name": "AppliedRecord", "type": "AppliedRecord", "versions": "1+", "fields": [
+      { "name": "EndOffset", "type": "int64", "versions": "1+" },
+      { "name": "Epoch", "type": "int32", "versions": "1+" }
+    ]},
+    { "name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+" }
   ],
   "commonStructs": [
     { "name": "Voter", "versions": "0+", "fields": [
-      {"name": "VoterId", "type": "int32", "versions": "0+"}
+      { "name": "VoterId", "type": "int32", "versions": "0+" },
+      { "name": "VoterUuid", "type": "uuid", "versions": "1+" },
+      { "name": "EndPoints", "type": "[]Endpoint", "versions": "1+",
+        "about": "The endpoint that can be used to communicate with the voter", "fields": [
+        { "name": "Name", "type": "string", "versions": "1+", "mapKey": true,
+          "about": "The name of the endpoint" },
+        { "name": "Host", "type": "string", "versions": "1+",
+          "about": "The hostname" },
+        { "name": "Port", "type": "uint16", "versions": "1+",
+          "about": "The port" },
+        { "name": "SecurityProtocol", "type": "int16", "versions": "1+",
+          "about": "The security protocol" }
+      ]},
+      { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "1+",
+        "about": "The range of versions of the protocol that the replica supports", "fields": [
+        { "name": "MinSupportedVersion", "type": "int16", "versions": "1+",
+          "about": "The minimum supported KRaft protocol version" },
+        { "name": "MaxSupportedVersion", "type": "int16", "versions": "1+",
+          "about": "The maximum supported KRaft protocol version" }
+      ]}
     ]}
   ]
 }

RPCs

UpdateFeatures

The schema and version for this API will not change but the handling of the RPC will be updated to handle the kraft.version feature.

Handling

The controller node will forward any changes to the kraft.version to the RaftClient.

When upgrading the version, the RaftClient will:

  1. Verify that the current set of voters support the new version. The leader will learn the range of supported version from the UpdateVoter RPC. See that section for more details.
  2. Verify that all of the observers (brokers) support the new version. The leader will learn range of supported version from the QuorumController. The quorum controller will update the RaftClient whenever the broker registration changes. 
  3. If supported, the RaftClient will atomically write a control record batch to the log with all of the KRaftVersion and AddVoter control records. The AddVoter controller records only need to get written when updating from version 0 to version 1.
  4. Reply to the RPC when the control record batch is committed.

AddVoter

This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller, when sent to a broker, the broker will forward the request to the controller.

Handling

When the leader receives an AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait until there are no uncommitted add or remove voter record.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  4. Send an ApiVersions RPC to the first listener to discover the supported kraft.version of the new voter.
  5. Check that the new voter supports the current kraft.version.
  6. Append the AddVoterRecord to the log.
  7. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  8. Wait for the AddVoterRecord to commit using the majority of new voter set.
  9. Send the AddVoter response to the client.

The schema and version for this API will not change but the handling of the RPC will be updated to handle the kraft.version feature.

Handling

The controller node will forward any changes to the kraft.version to the RaftClient.

When upgrading the version, the RaftClient will:

  1. Verify that the current set of voters support the new version. The leader will learn the range of supported version from the UpdateVoter RPC. See that section for more details.
  2. Verify that all of the observers (brokers) support the new version. The leader will learn range of supported version from the QuorumController. The quorum controller will update the RaftClient whenever the broker registration changes. 
  3. If supported, the RaftClient will atomically write a control record batch to the log with all of the KRaftVersion and the VotersRecord control records. The VotersRecord controller records only need to get written when updating from version 0 to version 1.
  4. Reply to the RPC when the control record batch is committed.

AddVoter

This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller, when sent to a broker, the broker will forward the request to the active controller.

Handling

When the leader receives an AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait until there are no uncommitted VotersRecord. Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed. Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations.
  4. Send an ApiVersions RPC to the first listener to discover the supported kraft.version of the new voter.
  5. Check that the new voter supports the current kraft.version.
  6. Append the updated VotersRecord to the log.
  7. The KRaft internal listener will read this record from the log and add the voter to the voters set.
  8. Wait for the VotersRecord to commit using the majority of new voters set.
  9. Send the AddVoter response to the client.

In 1., the leader needs to wait for the replica to catch up because when the VotersRecord is appended to the log, the set of voter changes. If the new voter is too far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

Here is an example, where waiting for the new voter to catch up to the LEO is helpful. Let's assume that there are 3 voters with only two of them at the LEO or HWM.

Voter 1: LEO = 200
Voter 2: LEO = 200
Voter 3: LEO = 100

In this example the HWM is 200. To advance the HWM past 200 only voter 1 and 2 need to Fetch the new records. For voter 3 to be counted towards advancing the HWM it needs to Fetch to or past offset 200 (100 records in this example).

At this point if the user decides to add a new voter (4) to the voters set and the leader doesn't wait for voter (4) to catch up to the LEO, the voter set may become:

Voter 1: LEO = 200
Voter 2: LEO = 200
Voter 3: LEO = 100
Voter 4: LEO = 0

In this state the majority must include either voter 3, voter 4 or both voters 3 and 4. This means that leader cannot advance the HWM until at least either voter 3 or voter 4 have caught up to the HWM. In other, the KRaft topic partition won't be able to commit new records until at least one of those two voters (3 or 4) have caughtIn 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log, the set of voter changes. If the new voter is too far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 3., the leader will wait for its current epoch to commit by waiting for the LeaderChangeMessage to commit. This is required to guarantee that two competing voter voters set, the one from a previous leader and the one from the current leader, only differ by at most one voter. Waiting for the current epoch to commit means that there cannot be some other competing voter set from another leader that can later override this leader's new voter set. See See bug in single-server membership changes for more details on this.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord VotersRecord so it doesn't know that it is a voter for this topic partition. The new replica will accept the BeginQuorumEpoch RPC even if it is doesn't believe it is a member of the voter set. 

...

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replica ID and UUID that is already in the committed voter set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.
  4. INVALID_REQUEST - when the new voter doesn't support the current kraft.version.
  5. REQUEST_TIMED_OUT - when the new voter didn't catch-up to the LEO in the time specified in the request.

Request

Code Block
languagejs
{
  "apiKey": 75,
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "AddVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [": [
    { "name": "ClusterId", "type": "string", "versions": "0+" },
    { "name": "ClusterIdTimeoutMs", "type": "stringint32", "versions": "0+" },
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting added to the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting added to the topic partition" },
    { "name": "Listeners", "type": "[]Listener", "versions": "0+",
      "about": "The endpoints that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint" },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port" },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol" }
    ]}
  ]
}

...

  1. Wait until there are no uncommitted add or remove voter recordsVotersRecord. Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed. Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations.
  3. Append the RemoveVoterRecord VotersRecord to the log with the updated voters set.
  4. The KRaft internal listener will read this record from the log and remove the voter from the voter set.
  5. Wait for the RemoveVoterRecord VotersRecord to commit using the majority of new configuration.
  6. Send the RemoveVoter response to the client.
  7. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

In 3. and 4. it is possible for the RemoveVoterRecord VotersRecord would remove the current leader from the voter voters set. In this case the leader needs to allow Fetch and FetchSnapshot requests from replicas. The leader should not count itself when determining the majority and determining if records have been committed.

...

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replica ID and UUID that is already not in the committed voter set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.
  4. REQUEST_TIMED_OUT

Request

Code Block
languagejs
{
  "apiKey": 76,
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" },
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting removed from the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting removed from the topic partition" }
  ]
}

...

This RPC is different from AddVoter in two ways: 1. It it will be sent only by voters to the latest known leader and 2. It it includes both listener endpoints and kraft.version information. This RPC is useful to automatically update that a voter's endpoints and kraft.version information without additional operator intervention.

...

  1. Wait until there are no uncommitted add or remove voter records. Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committedget committed. Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations.
  3. Check that the updated voter supports the current kraft.version.
  4. Append the updated AddVoterRecord VotersRecord to the log.
  5. The KRaft internal listener will read this record from the log and update the voter's information. This include updating the endpoint used by the KRaft NetworkClient.
  6. Wait for the AddVoterRecord VotersRecord to commit using the majority of new voter voters set.
  7. Send the UpdateVoter response to the client.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_NOT_FOUND - when the request contains a replica ID and UUID that is not in the committed voters set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.
  4. REQUEST_TIMED_OUT

Request

Code Block
{
  "apiKey": 77,
  "type": "request",
  "listeners": ["controller"],
  "name": "UpdateVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" },
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting updated in the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting updated in the topic partition" },
    { "name": "Listeners", "type": "[]Listener", "versions": "0+",
      "about": "The endpoint that can be used to communicate with the leader", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint" },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port" },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol" }
    ]},
    { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+",
      "about": "The range of versions of the protocol that the replica supports", "fields": [
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported KRaft protocol version" },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported KRaft protocol version" }
    ]}
  ]
}

...

In addition to the current cases where the leader resigns and sends an EndQuorumEpoch RPC, the leader will also resign and send this RPC once a RemoveVoterRecord VotersRecord has committed and it is for the current leader. the leader is not part of the latest voters set.

Handling

This request will be handle similar to how it is described in KIP-595. The receiving replica will take the directory id (candidate uuid) into account when computing the fetch timeout exponential backoff.

...

The leader will track the fetched offset for the replica tuple (ID , and UUID). Replicas are unique uniquely identified by their ID and UUID so their state will be tracking using their ID and UUID.

When removing the leader from the voter voters set, it will remain the leader for that epoch until the RemoveVoterRecord VotersRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark) and when checking that the quorum is alive.

Request

Add the replica UUID (directory id) to the Fetch request. This is needed so that the leader can correctly track persisted offsets all of the voters.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/FetchRequest.json
diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json
index 235357d004..ff86469831 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -55,7 +55,9 @@
   // deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
   //
   // Version 16 is the same as version 15 (KIP-951).
-  "validVersions": "0-16",
+  //
+  // Version 17 adds directory id support from KIP-853
+  "validVersions": "0-17",
   "deprecatedVersions": "0-3",
   "flexibleVersions": "12+",
   "fields": [
@@ -100,7 +102,9 @@
         { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
           "about": "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
         { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
-          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." }
+          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
+        { "name": "ReplicaUuid", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0,
+          "about": "The directory id of the follower fetching" }
       ]}
     ]},
     { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,

...

This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated directory.id, 2) create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control records (KRaftVersionRecord and VotersRecord) to make this Kafka node the only voter for the quorum.

This option is unsafe because it doesn’t use the quorum to establish the new quorum. This will be documented in the –standalone option help description.

...

This option is unsafe because it doesn’t use the quorum to establish the new quorum. This only should only be executed once on one of the voters. The rest of the voters should join the quorum by using the kafka-metadata-quorum add-controller command.

--controller-quorum-voters

The value to this option will have the follow schema <replica-id>[-<replica-uuid>]@<host>:<port>. Logically, this function is very similar to the controller.quorum.voters. The important difference is that it will optionally support the user directly specifying the replica's directory id.

When the format command is executed with this option it will read the node.id configured in the properties file specified by the --config option and compare it against the <replica-id> specified in --controller.quorum.voters. If there is a match, it will write the <replica-uuid> specified to the directory.id property in the meta.properties for the metadata.log.dir directory.

Similar to --standalone this command will create a Similar to the --standalone option this command will 1) create a meta.properties file in metadata.log.dir with the specified directory id (replica-uuid). If the optional directory id is not specified then the command will generate a random directory id. 2) Create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control records (KRaftVersionRecord and VotersRecord).

This is option is very unsafe and it . It is important that the operator uses the same value across all of the voters specified and only executes this command once per voter.

kafka-features

The upgrade and downgrade command will support a new configuration flag. A downgrade that results in the decrease of the kraft.version will be rejected by the KRaft leader.

...