Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
git diff upstream/trunk raft/src/main/resources/common/message/QuorumStateData.json
diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json
index d71a32c75d..34881f05ff 100644
--- a/raft/src/main/resources/common/message/QuorumStateData.json
+++ b/raft/src/main/resources/common/message/QuorumStateData.json
@@ -16,19 +16,25 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
-    {"name": "ClusterId", "type": "string", "versions": "0+"},
-    {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "AppliedOffset", "type": "int64", "versions": "0+"},
-    {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"}
+    { "name": "ClusterId", "type": "string", "versions": "0" },
+    { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedUuid", "type": "uuid", "versions": "1+" },
+    { "name": "AppliedOffset", "type": "int64", "versions": "0" },
+    { "name": "AppliedRecord", "type": "AppliedRecord", "versions": "1+", "fields": [
+      { "name": "EndOffset", "type": "int64", "versions": "1+" },
+      { "name": "Epoch", "type": "int32", "versions": "1+" }
+    ]},
+    { "name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+" }
   ],
   "commonStructs": [
     { "name": "Voter", "versions": "0+", "fields": [
-      {"name": "VoterId", "type": "int32", "versions": "0+"}
+      { "name": "VoterId", "type": "int32", "versions": "0+" },
+      { "name": "VoterUuid", "type": "uuid", "versions": "1+" }
     ]}
   ]
 }

RPCs

UpdateFeatures

TODO: Fill this out

Handling

Request

Response

AddVoter

This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller, when sent to a broker, the broker will forward the request to the controller.

Handling

When the leader receives an AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait until there are no uncommitted add or remove voter record.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  4. Send an ApiVersions RPC to the first listener to discover the supported kraft.version of the new voter.
  5. Check that the new voter supports the current kraft.version.
  6. Append the AddVoterRecord to the log.
  7. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  8. Wait for the AddVoterRecord to commit using the majority of new voter set.
  9. Send the AddVoter response to the client.

In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log, the set of voter changes. If the new voter is too far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 3., the leader will wait for its current epoch to commit by waiting for the LeaderChangeMessage to commit. This is required to guarantee that two competing voter set, the one from a previous leader and the one from the current leader, only differ by at most one voter. Waiting for the current epoch to commit means that there cannot be some other competing voter set from another leader that can later override this leader's new voter set. See bug in single-server membership changes for more details on this.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will accept the BeginQuorumEpoch RPC even if it is doesn't believe it is a member of the voter set. 

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replica ID and UUID that is already in the committed voter set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.
  4. INVALID_REQUEST - when the new voter doesn't support the current kraft.version.

Request

The schema and version for this API will not change but the handling of the RPC will be updated to handle the kraft.version feature.

Handling

The controller node will forward any changes to the kraft.version to the RaftClient.

When upgrading the version, the RaftClient will:

  1. Verify that the current set of voters support the new version. The leader will learn the range of supported version from the UpdateVoter RPC. See that section for more details.
  2. Verify that all of the observers (brokers) support the new version. The leader will learn range of supported version from the QuorumController. The quorum controller will update the RaftClient whenever the broker registration changes. 
  3. If supported, the RaftClient will atomically write a control record batch to the log with all of the KRaftVersion and AddVoter control records. The AddVoter controller records only need to get written when updating from version 0 to version 1.
  4. Reply to the RPC when the control record batch is committed.

AddVoter

This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller, when sent to a broker, the broker will forward the request to the controller.

Handling

When the leader receives an AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait until there are no uncommitted add or remove voter record.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  4. Send an ApiVersions RPC to the first listener to discover the supported kraft.version of the new voter.
  5. Check that the new voter supports the current kraft.version.
  6. Append the AddVoterRecord to the log.
  7. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  8. Wait for the AddVoterRecord to commit using the majority of new voter set.
  9. Send the AddVoter response to the client.

In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log, the set of voter changes. If the new voter is too far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 3., the leader will wait for its current epoch to commit by waiting for the LeaderChangeMessage to commit. This is required to guarantee that two competing voter set, the one from a previous leader and the one from the current leader, only differ by at most one voter. Waiting for the current epoch to commit means that there cannot be some other competing voter set from another leader that can later override this leader's new voter set. See bug in single-server membership changes for more details on this.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will accept the BeginQuorumEpoch RPC even if it is doesn't believe it is a member of the voter set. 

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replica ID and UUID that is already in the committed voter set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.
  4. INVALID_REQUEST - when the new voter doesn't support the current kraft.version.

Request

Code Block
Code Block
languagejs
{
  "apiKey": 75,
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "AddVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" },
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting added to the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting added to the topic partition" },
    { "name": "Listeners", "type": "[]Listener", "versions": "0+",
      "about": "The endpoints that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint" },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port" },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol" }
    ]}
  ]
}

Response

Code Block
languagejs
{
  "apiKey": 75,
  "type": "response",
  "name": "AddVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error" },
    { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
        "about": "The replica id of the current leader or -1 if the leader is unknown" },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The latest known leader epoch" }
    ]},
    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "Endpoint for current leader of the topic partition", "fields": [
      { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
      { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
    ]}
  ]
}

...

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replica ID and UUID that is already not in the committed voter set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.

Request

Code Block
languagejs
{
  "apiKey": 76,
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting removed from the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting removed from the topic partition" }
  ]
}

Response

Code Block
languagejs
{
  "apiKey": 76,
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error" },
    { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
        "about": "The replica id of the current leader or -1 if the leader is unknown" },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The latest known leader epoch" }
    ]},
    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "Endpoint for current leader of the topic partition", "fields": [
      { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
      { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
    ]}
  ]
}

...

This RPC is different from AddVoter in two ways. It will be sent only by voters to the latest known leader. It include includes both listener endpoints and kraft.version information. This is useful to automatically update that information without additional operator intervention.

The voter will always send the UpdateVoter RPC whenever it starts and whenever the leader changes. The voter will continue to send the UpdateVoter RPC until the call as been acknowledge by the current leader. 

Handling

When the leader receives an UpdateVoter request it will do the following:

  1. Wait until there are no uncommitted add or remove voter records.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  3. Check that the updated voter supports the current kraft.version.
  4. Append the updated AddVoterRecord to the log.
  5. The KRaft internal listener will read this record from the log and update the voter's information. This include updating the endpoint used by the KRaft NetworkClient.
  6. Wait for the AddVoterRecord to commit using the majority of new voter set.
  7. Send the UpdateVoter response to the client.

Request

Code Block
{
  "apiKey": 77,
  "type": "request",
  "listeners": ["controller"],
  "name": "UpdateVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" },
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting updated in the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting updated in the topic partition" },
    { "name": "Listeners", "type": "[]Listener", "versions": "0+",
      "about": "The endpoint that can be used to communicate with the leader", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint" },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port" },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol" }
    ]},
    { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+",
      "about": "The range of versions of the protocol that the replica supports", "fields": [
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported KRaft protocol version" },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported KRaft protocol version" }
    ]}
  ]
}

Response

Code Block
{
  "apiKey": 77,
  "type": "response",
  "name": "UpdateVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error" },
    { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
        "about": "The replica id of the current leader or -1 if the leader is unknown" },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The latest known leader epoch" }
    ]},
    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "Endpoint for current leader of the topic partition", "fields": [
      { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
      { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
    ]}
  ]
}

...

  1. INCONSISTENT_VOTER_SET - because the voting replica will not validate that either the candidate or the voter are voters.

Request

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/VoteRequest.json
diff --git a/clients/src/main/resources/common/message/VoteRequest.json b/clients/src/main/resources/common/message/VoteRequest.json
index 35583a790b..ff808cbafe 100644
--- a/clients/src/main/resources/common/message/VoteRequest.json
+++ b/clients/src/main/resources/common/message/VoteRequest.json
@@ -18,11 +18,13 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "VoteRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "0+",
       "nullableVersions": "0+", "default": "null"},
+    { "name": "VoterId", "type": "int32", "versions": "1+", "ignorable": true, "default": "-1", "entityType": "brokerId",
+      "about": "The replica id of the voter receiving the request" },
     { "name": "Topics", "type": "[]TopicData",
       "versions": "0+", "fields": [
       { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
@@ -34,14 +36,16 @@
         { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
           "about": "The bumped epoch of the candidate sending the request"},
         { "name": "CandidateId", "type": "int32", "versions": "0+", "entityType": "brokerId",
-          "about": "The ID of the voter sending the request"},
+          "about": "The replica id of the voter sending the request"},
+        { "name": "CandidateUuid", "type": "uuid", "versions": "1+",
+          "about": "The directory id of the voter sending the request" },
+        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
+          "about": "The directory id of the voter receiving the request to vote, empty uuid if unknown" },
         { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
           "about": "The epoch of the last record written to the metadata log"},
         { "name": "LastOffset", "type": "int64", "versions": "0+",
           "about": "The offset of the last record written to the metadata log"}
       ]}
     ]}
   ]
 }

Response

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/VoteResponse.json
diff --git a/clients/src/main/resources/common/message/VoteResponse.json b/clients/src/main/resources/common/message/VoteResponse.json
index b92d0070c1..21d0aa5312 100644
--- a/clients/src/main/resources/common/message/VoteResponse.json
+++ b/clients/src/main/resources/common/message/VoteResponse.json
@@ -17,7 +17,7 @@
   "apiKey": 52,
   "type": "response",
   "name": "VoteResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
@@ -37,9 +37,14 @@
           "about": "The latest known leader epoch"},
         { "name": "VoteGranted", "type": "bool", "versions": "0+",
           "about": "True if the vote was granted and false otherwise"}
       ]}
     ]},
+    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+      "about": "Endpoints for all current-leaders enumerated in PartitionData", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "1+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node"},
+      { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
+      { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
+    ]}
   ]
 }

...

  1. INCONSISTENT_VOTER_SET - because the voting replica will not validate that either the candidate or the voter are voters.

Request

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
diff --git a/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json b/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
index d9d6d92c88..edd128fb8c 100644
--- a/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
+++ b/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
@@ -18,24 +18,35 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "BeginQuorumEpochRequest",
-  "validVersions": "0",
-  "flexibleVersions": "none",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "0+",
       "nullableVersions": "0+", "default": "null"},
+    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
+      "about": "The voter ID of the receiving replica" },
     { "name": "Topics", "type": "[]TopicData",
       "versions": "0+", "fields": [
       { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name" },
       { "name": "Partitions", "type": "[]PartitionData",
         "versions": "0+", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index" },
+        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
+          "about": "The directory id of the receiving replica" },
         { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
           "about": "The ID of the newly elected leader"},
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The epoch of the newly elected leader"}
       ]}
+    ]},
+    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+      "about": "Endpoint for the leader", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "1+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+      { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
+      { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
     ]}
   ]
 }

Response

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/BeginQuorumEpochResponse.json
diff --git a/clients/src/main/resources/common/message/BeginQuorumEpochResponse.json b/clients/src/main/resources/common/message/BeginQuorumEpochResponse.json
index 4b7d7f5a95..12639bba2f 100644
--- a/clients/src/main/resources/common/message/BeginQuorumEpochResponse.json
+++ b/clients/src/main/resources/common/message/BeginQuorumEpochResponse.json
@@ -17,25 +17,32 @@
   "apiKey": 53,
   "type": "response",
   "name": "BeginQuorumEpochResponse",
-  "validVersions": "0",
-  "flexibleVersions": "none",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top level error code"},
     { "name": "Topics", "type": "[]TopicData",
       "versions": "0+", "fields": [
       { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name" },
       { "name": "Partitions", "type": "[]PartitionData",
         "versions": "0+", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index" },
         { "name": "ErrorCode", "type": "int16", "versions": "0+"},
         { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
           "about": "The ID of the current leader or -1 if the leader is unknown"},
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The latest known leader epoch"}
       ]}
+    ]},
+    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+      "about": "Endpoints for all leaders enumerated in PartitionData", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "1+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+      { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
+      { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
     ]}
   ]
 }

...

This request will be handle similar to how it is described in KIP-595. The receiving replica will take the directory id (candidate uuid) into account when computing the fetch timeout exponential backoff.

Request

PreferredSuccessor which is an array is replica ids, will be replaced by PreferredCandidates which is an array of the tuple candidate id and candidate uuid.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/EndQuorumEpochRequest.json
diff --git a/clients/src/main/resources/common/message/EndQuorumEpochRequest.json b/clients/src/main/resources/common/message/EndQuorumEpochRequest.json
index a6e4076412..d9122fa930 100644
--- a/clients/src/main/resources/common/message/EndQuorumEpochRequest.json
+++ b/clients/src/main/resources/common/message/EndQuorumEpochRequest.json
@@ -18,8 +18,8 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "EndQuorumEpochRequest",
-  "validVersions": "0",
-  "flexibleVersions": "none",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "0+",
       "nullableVersions": "0+", "default": "null"},
@@ -35,8 +35,13 @@
           "about": "The current leader ID that is resigning"},
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The current epoch"},
-        { "name": "PreferredSuccessors", "type": "[]int32", "versions": "0+",
-          "about": "A sorted list of preferred successors to start the election"}
+        { "name": "PreferredSuccessors", "type": "[]int32", "versions": "0",
+          "about": "A sorted list of preferred successors to start the election" },
+        { "name": "PreferredCandidates", "type": "[]ReplicaInfo", "versions": "1+",
+          "about": "A sorted list of preferred successors to start the election", "fields": [
+          { "name": "CandidateId", "type": "int32", "versions": "1+", "entityType": "brokerId" },
+          { "name": "CandidateUuid", "type": "uuid", "versions": "1+" }
+        ]}
       ]}
     ]}
   ]

Response

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/EndQuorumEpochResponse.json
diff --git a/clients/src/main/resources/common/message/EndQuorumEpochResponse.json b/clients/src/main/resources/common/message/EndQuorumEpochResponse.json
index cd23247045..0d5d61b7e7 100644
--- a/clients/src/main/resources/common/message/EndQuorumEpochResponse.json
+++ b/clients/src/main/resources/common/message/EndQuorumEpochResponse.json
@@ -17,8 +17,8 @@
   "apiKey": 54,
   "type": "response",
   "name": "EndQuorumEpochResponse",
-  "validVersions": "0",
-  "flexibleVersions": "none",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top level error code."},
@@ -36,6 +36,13 @@
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The latest known leader epoch"}
       ]}
+    ]},
+    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+      "about": "Endpoints for all leaders enumerated in PartitionData", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "1+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+      { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
+      { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
     ]}
   ]
 }

Fetch

Handling

TODO: Fix this wording.

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are The leader will track the fetched offset for the replica tuple (ID, UUID). Replicas are unique identified by their ID and UUID . So so their state will be tracking using the their ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

Request

TODO: Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voterAdd the replica UUID (directory id) to the Fetch request. This is needed so that the leader can correctly track persisted offsets all of the voters.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/FetchRequest.json
diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json
index 235357d004..ff86469831 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -55,7 +55,9 @@
   // deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
   //
   // Version 16 is the same as version 15 (KIP-951).
-  "validVersions": "0-16",
+  //
+  // Version 17 adds directory id support from KIP-853
+  "validVersions": "0-17",
   "deprecatedVersions": "0-3",
   "flexibleVersions": "12+",
   "fields": [
@@ -100,7 +102,9 @@
         { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
           "about": "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
         { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
-          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." }
+          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
+        { "name": "ReplicaUuid", "type": "uuid", "versions": "17+",
+          "about": "The directory id of the follower fetching" }
       ]}
     ]},
     { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,

Response

Code Block
git diff upstream/trunk clients/src/main/resources/common/message/FetchResponse.json
diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json
index e5f49ba6fd..b432a79719 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -47,7 +47,7 @@
   // Version 15 is the same as version 14 (KIP-903).
   //
   // Version 16 adds the 'NodeEndpoints' field (KIP-951).
-  "validVersions": "0-16",
+  "validVersions": "0-17",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,

FetchSnapshot

Handling

TODO: Fix this wording.

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

No changes are needed to the handling of this RPC. The only thing to keep in mind is that replicas will accept FetchSnapshot request as long as they are the latest leader even if they are not in the current voter set.

Request

Add the replica UUID (directory id) to the Fetch request. This is not needed for correctness because leaders only track fetched offsets using the Fetch RPC but it is good add for consistency and debugability.

Code Block
Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/FetchSnapshotRequest.json
diff --git a/clients/src/main/resources/common/message/FetchSnapshotRequest.json b/clients/src/main/resources/common/message/FetchSnapshotRequest.json
index 358ef2e322..9a577d6289 100644
--- a/clients/src/main/resources/common/message/FetchSnapshotRequest.json
+++ b/clients/src/main/resources/common/message/FetchSnapshotRequest.json
@@ -18,11 +18,11 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "FetchSnapshotRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
       "about": "The cluster ID if known" },
     { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
       "about": "The broker ID of the follower" },
     { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
@@ -44,7 +44,9 @@
           { "name": "Epoch", "type": "int32", "versions": "0+" }
         ]},
         { "name": "Position", "type": "int64", "versions": "0+",
-          "about": "The byte position within the snapshot to start fetching from" }
+          "about": "The byte position within the snapshot to start fetching from" },
+        { "name": "ReplicaUuid", "type": "uuid", "versions": "1+",
+          "about": "The directory id of the follower fetching" }
       ]}
     ]}
   ]

Response

Version 1 renames LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/FetchSnapshotResponse.json | cat
diff --git a/clients/src/main/resources/common/message/FetchSnapshotResponse.json b/clients/src/main/resources/common/message/FetchSnapshotResponse.json
index 887a5e4401..2d9d269930 100644
--- a/clients/src/main/resources/common/message/FetchSnapshotResponse.json
+++ b/clients/src/main/resources/common/message/FetchSnapshotResponse.json
@@ -17,23 +17,23 @@
   "apiKey": 59,
   "type": "response",
   "name": "FetchSnapshotResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota" },
     { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
       "about": "The top level response error code" },
     { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
       "about": "The topics to fetch", "fields": [
       { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The name of the topic to fetch" },
       { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
         "about": "The partitions to fetch", "fields": [
         { "name": "Index", "type": "int32", "versions": "0+",
           "about": "The partition index" },
         { "name": "ErrorCode", "type": "int16", "versions": "0+",
           "about": "The error code, or 0 if there was no fetch error" },
         { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
           "about": "The snapshot endOffset and epoch fetched",
           "fields": [
@@ -43,17 +43,24 @@
         { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
           "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
           { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
             "about": "The ID of the current leader or -1 if the leader is unknown" },
           { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
             "about": "The latest known leader epoch" }
         ]},
         { "name": "Size", "type": "int64", "versions": "0+",
           "about": "The total size of the snapshot" },
         { "name": "Position", "type": "int64", "versions": "0+",
           "about": "The starting byte position within the snapshot included in the Bytes field" },
         { "name": "UnalignedRecords", "type": "records", "versions": "0+",
           "about": "Snapshot data in records format which may not be aligned on an offset boundary" }
       ]}
+    ]},
+    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+      "about": "Endpoints for all current-leaders enumerated in PartitionSnapshot", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "1+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+      { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
+      { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
     ]}
   ]
 }
 

DescribeQuorum

TODO: Fix this RPC

Handling

The DescribeQuorum RPC will get forwarded to the KRaft cluster metadata leader. The response for this RPC will be extended to include the UUID of TODO: Include the listeners/endpoints for all of the voters known by the leader.

The version of the request is increase and the fields remain unchanged.

Handling

TODO: Review this.

and observers, and the endpoint information for all of the voters.

Since the node and listener information is index by the node id, if there are multiple voters with the same replica id only the latest entry will be returned and usedThe handling of the request is the same as that described in KIP-595 with just the additional fields. Clients handling the response can assume that if a ReplicaState include both the ID and the UUID that replica can become a voter if is enumerated in the Observers field.

Request

Code Block
git diff upstream/trunk clients/src/main/resources/common/message/DescribeQuorumRequest.json
diff --git a/clients/src/main/resources/common/message/DescribeQuorumRequest.json b/clients/src/main/resources/common/message/DescribeQuorumRequest.json
index cee8fe6982..93ab303aaf 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumRequest.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumRequest.json
@@ -19,7 +19,7 @@
   "listeners": ["broker", "controller"],
  /DescribeQuorumRequest.json
@@ -19,7 +19,7 @@
   "listeners": ["broker", "controller"],
   "name": "DescribeQuorumRequest",
   // Version 1 adds additional fields in the response. The request is unchanged (KIP-836).
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "Topics", "type": "[]TopicData",

Response

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/DescribeQuorumResponse.json
diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
index 0ea6271238..e2481dff04 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
@@ -18,7 +18,7 @@
   "type": "response",
   "name": "DescribeQuorumResponse",
   // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
@@ -40,10 +40,25 @@
         { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
         { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
       ]}
-    ]}],
+    ]},
+    { "name": "Nodes", "type": "[]Node", "versions": "2+", "fields": [
+      { "name": "DescribeQuorumRequestNodeId",
 "type":  // Version 1 adds additional fields in the response. The request is unchanged (KIP-836).
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "Topics", "type": "[]TopicData",

Response

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/DescribeQuorumResponse.json
diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
index 0ea6271238..dfc36937ea 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
@@ -18,7 +18,7 @@
   "type": "response",
  "int32", "versions": "2+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+      { "name": "Listeners", "type": "[]Listener",
+        "about": "The listeners of this controller", "versions": "0+", "fields": [
+        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+          "about": "The name of the endpoint" },
+        { "name": "DescribeQuorumResponseHost",
 "type":  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions"string", "versions": "0+",
+          "about": "The hostname" },
+        { "name": "Port", "type": "uint16", "versions": "0+",
   "fields": [
+       { "name": "ErrorCode", "typeabout": "int16",The port"versions": "0+",
@@ -44,6 +44,7 @@ }
+      ]}
+    ]}
+  ],
   "commonStructs": [
     { "name": "ReplicaState", "versions": "0+", "fields": [
       { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
+      { "name": "ReplicaUuid", "type": "uuid", "versions": "2+" },
       { "name": "LogEndOffset", "type": "int64", "versions": "0+",
         "about": "The last known log end offset of the follower or -1 if it is unknown"},
       { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,

...

The features in this KIP will be supported if the ApiVersions of all of the voters and observers is greater than the versions described here. If the leader has a replica UUID for all of the voters then this KIP is supported by all of the voters.

...

When to remove controller.quorum

...

.voters

It is safe for the operator to remove the configuration for controller.qourum.voters when the kraft.version has been upgrade to version 1. All of the Kafka nodes will expose the ignored-static-voter metrics. If all of the Kafka nodes expose a 1 for this metrics, it is safe to remove controller.quorum.voters from the node configuration and specify the controller.quorum.bootstrap.servers instead.TODO: figure out a way to do this. The requirement is the AddVoterRecord for all of the voters in the static configuration are committed. How do we guarantee this? Is seeing a committed AddVoterRecord enough?

Test Plan

This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ specification.

...