Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
git diff upstream/trunk raft/src/main/resources/common/message/QuorumStateData.json
diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json
index d71a32c75d..34881f05ff 100644
--- a/raft/src/main/resources/common/message/QuorumStateData.json
+++ b/raft/src/main/resources/common/message/QuorumStateData.json
@@ -16,19 +16,25 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
-    {"name": "ClusterId", "type": "string", "versions": "0+"},
-    {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
-    {"name": "AppliedOffset", "type": "int64", "versions": "0+"},
-    {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"}
+    { "name": "ClusterId", "type": "string", "versions": "0" },
+    { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1" },
+    { "name": "VotedUuid", "type": "uuid", "versions": "1+" },
+    { "name": "AppliedOffset", "type": "int64", "versions": "0" },
+    { "name": "AppliedRecord", "type": "AppliedRecord", "versions": "1+", "fields": [
+      { "name": "EndOffset", "type": "int64", "versions": "1+" },
+      { "name": "Epoch", "type": "int32", "versions": "1+" }
+    ]},
+    { "name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+" }
   ],
   "commonStructs": [
     { "name": "Voter", "versions": "0+", "fields": [
-      {"name": "VoterId", "type": "int32", "versions": "0+"}
+      { "name": "VoterId", "type": "int32", "versions": "0+" },
+      { "name": "VoterUuid", "type": "uuid", "versions": "1+" }
     ]}
   ]
 }

RPCs

UpdateFeatures

TODO: Fill this out

Handling

Request

Response

AddVoter

This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller, when sent to a broker, the broker will forward the request to the controller.

Handling

When the leader receives an AddVoter request it will do the following:

...

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replica ID and UUID that is already in the committed voter set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.
  4. INVALID_REQUEST - when the new voter doesn't support the current kraft.version.

Request

Code Block
languagejs
{
  "apiKey": 75,
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "AddVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" },
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting added to the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting added to the topic partition" },
    { "name": "Listeners", "type": "[]Listener", "versions": "0+",
      "about": "The endpoints that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint" },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port" },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol" }
    ]}
  ]
}

Response

Code Block
languagejs
{
  "apiKey": 75,
  "type": "response",
  "name": "AddVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error" },
    { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
        "about": "The replica id of the current leader or -1 if the leader is unknown" },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The latest known leader epoch" }
    ]},
    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "Endpoint for current leader of the topic partition", "fields": [
      { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
      { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
    ]}
  ]
}

...

This RPC can be sent by an administrative client to remove a voter from the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.

Handling

When the leader receives a RemoveVoter request it will do the following:

...

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replica ID and UUID that is already not in the committed voter set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.

Request

Code Block
languagejs
{
  "apiKey": 76,
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting removed from the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting removed from the topic partition" }
  ]
}

Response

Code Block
languagejs
{
  "apiKey": 76,
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error" },
    { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
        "about": "The replica id of the current leader or -1 if the leader is unknown" },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The latest known leader epoch" }
    ]},
    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "Endpoint for current leader of the topic partition", "fields": [
      { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
      { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
    ]}
  ]
}

...

This RPC is different from AddVoter in two ways. It will be sent only by voters to the latest known leader. It include both listener endpoints and kraft.version information. This is useful to automatically update that information without additional operator intervention.

Handling

When the leader receives an UpdateVoter request it will do the following:

  1. Wait until there are no uncommitted add or remove voter records.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  3. Check that the updated voter supports the current kraft.version.
  4. Append the updated AddVoterRecord to the log.
  5. The KRaft internal listener will read this record from the log and update the voter's information. This include updating the endpoint used by the KRaft NetworkClient.
  6. Wait for the AddVoterRecord to commit using the majority of new voter set.
  7. Send the UpdateVoter response to the client.

Request

Code Block
{
  "apiKey": 77,
  "type": "request",
  "listeners": ["controller"],
  "name": "UpdateVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" },
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The replica id of the voter getting updated in the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting updated in the topic partition" },
    { "name": "Listeners", "type": "[]Listener", "versions": "0+",
      "about": "The endpoint that can be used to communicate with the leader", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint" },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port" },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol" }
    ]},
    { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+",
      "about": "The range of versions of the protocol that the replica supports", "fields": [
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported KRaft protocol version" },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported KRaft protocol version" }
    ]}
  ]
}

Response

Code Block
{
  "apiKey": 77,
  "type": "response",
  "name": "UpdateVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error" },
    { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
        "about": "The replica id of the current leader or -1 if the leader is unknown" },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The latest known leader epoch" }
    ]},
    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "Endpoint for current leader of the topic partition", "fields": [
      { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
      { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
    ]}
  ]
}

Vote

Handling

Since the set of voters can change and not all replicas know the latest voter set, handling of Vote request needs to be relaxed from what was defined and implemented for KIP-595.

...

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match the local ID and UUID.
  2. UNSUPPORTED_VERSION - when a non-empty candidate UUID is specified but the voter doesn't support kraft.version 1.

The replica will not return the following errors anymore:

  1. INCONSISTENT_VOTER_SET - because the voting replica will not validate that either the candidate or the voter are voters.

Request

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/VoteRequest.json
diff --git a/clients/src/main/resources/common/message/VoteRequest.json b/clients/src/main/resources/common/message/VoteRequest.json
index 35583a790b..ff808cbafe 100644
--- a/clients/src/main/resources/common/message/VoteRequest.json
+++ b/clients/src/main/resources/common/message/VoteRequest.json
@@ -18,11 +18,13 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "VoteRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "0+",
       "nullableVersions": "0+", "default": "null"},
+    { "name": "VoterId", "type": "int32", "versions": "1+", "ignorable": true, "default": "-1", "entityType": "brokerId",
+      "about": "The replica id of the voter receiving the request" },
     { "name": "Topics", "type": "[]TopicData",
       "versions": "0+", "fields": [
       { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
@@ -34,14 +36,16 @@
         { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
           "about": "The bumped epoch of the candidate sending the request"},
         { "name": "CandidateId", "type": "int32", "versions": "0+", "entityType": "brokerId",
-          "about": "The ID of the voter sending the request"},
+          "about": "The replica id of the voter sending the request"},
+        { "name": "CandidateUuid", "type": "uuid", "versions": "1+",
+          "about": "The directory id of the voter sending the request" },
+        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
+          "about": "The directory id of the voter receiving the request to vote, empty uuid if unknown" },
         { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
           "about": "The epoch of the last record written to the metadata log"},
         { "name": "LastOffset", "type": "int64", "versions": "0+",
           "about": "The offset of the last record written to the metadata log"}
       ]}
     ]}
   ]
 }

Response

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/VoteResponse.json
diff --git a/clients/src/main/resources/common/message/VoteResponse.json b/clients/src/main/resources/common/message/VoteResponse.json
index b92d0070c1..21d0aa5312 100644
--- a/clients/src/main/resources/common/message/VoteResponse.json
+++ b/clients/src/main/resources/common/message/VoteResponse.json
@@ -17,7 +17,7 @@
   "apiKey": 52,
   "type": "response",
   "name": "VoteResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
@@ -37,9 +37,14 @@
           "about": "The latest known leader epoch"},
         { "name": "VoteGranted", "type": "bool", "versions": "0+",
           "about": "True if the vote was granted and false otherwise"}
       ]}
     ]},
+    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+      "about": "Endpoints for all current-leaders enumerated in PartitionData", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "1+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node"},
+      { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
+      { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
+    ]}
   ]
 }

UpdateFeatures

TODO: Fill this out

Handling

Request

Response

Fetch

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

BeginQuorumEpoch

Handling

When handling the BeginQuorumEpoch request the replica will accept the request if the LeaderEpoch is equal or greater than their epic. The receiving replica will not check if the new leader or itself is in the voter set. This change is required because the receiving may not have fetched the latest voter set.

The BeginQuorumEpoch RPC will be used by the leader to propagate the leader endpoint to all of the voters. This is needed so that all voters have the necessary information to send Fetch and FetchSnaphsot requests.

The check quorum algorithm implemented by the leader needs to be extended so that BeginQuorumEpoch request are sent to any voter that is not actively fetching (Fetch and FetchSnapshot) from the leader.

The replica will return the following new errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match the local ID and UUID.

The replica will not return the following errors anymore:

  1. INCONSISTENT_VOTER_SET - because the voting replica will not validate that either the candidate or the voter are voters.

Request

Code Block
languagejs
{
git  "apiKey": 1,
  "type": "request",
diff upstream/trunk clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
diff --git a/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json b/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
index d9d6d92c88..edd128fb8c 100644
--- a/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
+++ b/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json
@@ -18,24 +18,35 @@
   "type": "request",
   "listeners": ["zkBrokercontroller"],
   "name": "brokerBeginQuorumEpochRequest",
- "controller"] "validVersions": "0",
-  "nameflexibleVersions": "FetchRequestnone",
+  "validVersions": "0-141",
+  "flexibleVersions": "121+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "120+",
       "nullableVersions": "120+", "default": "null"},
+ "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaIdVoterId", "type": "int32", "versions": "01+", "entityType": "brokerId",
+      "about": "The replicavoter ID of the follower, of -1 if this request is from a consumer.receiving replica" },
    ...
    { "name": "Topics", "type": "[]FetchTopicTopicData",
 "versions": "0+",
      "aboutversions": "The topics to fetch.0+", "fields": [
       { "name": "TopicTopicName", "type": "string", "versions": "0-12+", "entityType": "topicName", "ignorable": true,

         "about": "The name of the topic to fetch.name" },
       { "name": "TopicIdPartitions", "type": "uuid[]PartitionData",
         "versions": "130+", "ignorablefields": true,
     [
   "about": "The unique topic ID"},
      { "name": "PartitionsPartitionIndex", "type": "[]FetchPartitionint32", "versions": "0+",
           "about": "The partitionspartition to fetch.", "fields": [
index" },
+        { "name": "PartitionVoterUuid", "type": "int32uuid", "versions": "01+",
+          "about": "The partition index. directory id of the receiving replica" },
          { "name": "ReplicaUuidLeaderId", "type": "uuidint32", "versions": "140+", "nullableVersionsentityType": "14+brokerId", "default": "null",

           "about": "The ID replicaof generatedthe UUID.newly nullelected otherwise.leader" },
        ...
 { "name": "LeaderEpoch", "type":  ]}
    ]}
  ]
}

Response

Version 14 rename LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    ...
"int32", "versions": "0+",
           "about": "The epoch of the newly elected leader"}
       ]}
+    ]},
+    { "name": "ResponsesNodeEndpoint", "type": "[]FetchableTopicResponseNodeEndpoint", "versions": "1+", "taggedVersions": "01+", "tag": 0,
+      "about": "TheEndpoint for responsethe topics.leader", "fields": [
+      { "name": "TopicNodeId", "type": "stringint32", "versions": "0-121+",
+        "ignorablemapKey": true, "entityType": "topicNamebrokerId",
        "about" "about": "The topic name.ID of the associated node" },
+      { "name": "TopicIdHost", "type": "uuidstring", "versions": "131+", "ignorable": true, "about": "The unique topic ID"node's hostname" },
+      { "name": "PartitionsPort", "type": "[]PartitionDataint32", "versions": "01+",
        "about": "The topic partitions.", "fields": [node's port" }
     ]}
   ...]
 }

Response

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/BeginQuorumEpochResponse.json
diff --git a/clients/src/main/resources/common/message/BeginQuorumEpochResponse.json  { "name": "CurrentLeader", b/clients/src/main/resources/common/message/BeginQuorumEpochResponse.json
index 4b7d7f5a95..12639bba2f 100644
--- a/clients/src/main/resources/common/message/BeginQuorumEpochResponse.json
+++ b/clients/src/main/resources/common/message/BeginQuorumEpochResponse.json
@@ -17,25 +17,32 @@
   "apiKey": 53,
   "type": "CurrentLeaderresponse",
   "name": "BeginQuorumEpochResponse",
-  "validVersions": "0",
-   "versionsflexibleVersions": "12+none",
+  "taggedVersionsvalidVersions": "12+0-1",
+  "tagflexibleVersions": "1+",
   "fields": [
           { "name": "LeaderIdErrorCode", "type": "int32int16", "versions": "120+", "default": "-1", "entityType
       "about": "brokerId",
The top level error code"},
     {   "aboutname": "Topics"The ID of the current leader or -1 if the leader is unknown."},
  , "type": "[]TopicData",
       "versions": "0+", "fields": [
        { "name": "LeaderEpochTopicName", "type": "int32string", "versions": "120+", "defaultentityType": "-1topicName",
            "about": "The latest known leader epoch"topic name" },
            { "name": "EndPointPartitions", "type": "Endpoint[]PartitionData", "versions": "14+",
            "aboutversions": "The endpoint that can be used to communicate with the leader0+", "fields": [
            { "name": "HostPartitionIndex", "type": "stringint32", "versions": "140+",
              "about": "The partition hostname.index" },
            { "name": "PortErrorCode", "type": "uint16int16", "versions": "140+"},
             { "aboutname": "The port." }LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
           "about":  ]}
        ]}"The ID of the current leader or -1 if the leader is unknown"},
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
        ...   "about": "The latest known leader epoch"}
       ]}
+    ]},
+    { "name": "NodeEndpoints", "type": ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
"[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+      "about": "Endpoints for all leaders enumerated in PartitionData", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "1+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+      { "name": "ClusterIdHost", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "taggedVersions": "01+", "tag": 0,
      "about": "The clusterId if known, this is used to validate metadata fetches prior to broker
   registration" },
node's hostname" },
+      { "name": "ReplicaIdPort", "type": "int32", "versions": "01+", "defaultabout": "-1", "entityType": "brokerId",The node's port" }
     ]}
   ]
 }

Fetch

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [ "about": "The replica ID of the follower" },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
      "about": "The maximum bytes to fetch from all of the snapshots" },
    { "name": "TopicsClusterId", "type": "[]TopicSnapshotstring", "versions": "12+", "nullableVersions": "12+", "default": "null", "versionstaggedVersions": "12+", "0+"tag": 0, "ignorable": true,
      "about": "The topics clusterId if known. This is used to validate metadata fetches prior to fetch", "fields": [broker registration." },
      { "name": "NameReplicaId", "type": "stringint32", "versions": "0+", "entityType": "topicNamebrokerId",
        "about": "The namereplica ID of the topic to fetchfollower, of -1 if this request is from a consumer." },
    ...
    { "name": "PartitionsTopics", "type": "[]PartitionSnapshotFetchTopic", "versions": "0+",
        "about": "The partitionstopics to fetch.", "fields": [
        { "name": "PartitionTopic", "type": "int32string", "versions": "0+-12",
          "aboutentityType": "topicName", "ignorable": true,
        "about": "The partition index name of the topic to fetch." },
        { "name": "ReplicaUuidTopicId", "type": "uuid", "versions": "113+", "defaultignorable": "null"true,
           "about": "The replicaunique UUID of the follower" topic ID"}, 
        { "name": "CurrentLeaderEpochPartitions", "type": "int32[]FetchPartition", "versions": "0+",
          "about": "The currentpartitions leader epoch of the partition, -1 for unknown leader epoch" },to fetch.", "fields": [
        { "name": "SnapshotIdPartition", "type": "SnapshotIdint32", "versions": "0+",
          "about": "The snapshotpartition endOffset and epoch to fetch", "fields": [index." },
           { "name": "EndOffsetReplicaUuid", "type": "int64uuid", "versions": "014+" },
          { "namenullableVersions": "Epoch14+", "typedefault": "int32null", "versions": "0+" }
        ]},
        { "name"about": "Position", "type": "int64", "versions": "0+"The replica generated UUID. null otherwise." },
          "about": "The byte position within the snapshot to start fetching from" }...
      ]}
     ]}
  ]
}

Response

Version 1 renames 14 rename LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

language
Code Block
js
{
  "apiKey": 591,
  "type": "response",
  "name": "FetchSnapshotResponseFetchResponse",
  "validVersions": "0-114",
  "flexibleVersions": "012+",
  "fields": [
    ...
    { "name": "ThrottleTimeMsResponses", "type": "int32[]FetchableTopicResponse", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quotaresponse topics.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "ignorable": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "ErrorCodeTopicId", "type": "int16uuid", "versions": "013+", "ignorable": falsetrue,
      "about": "The topunique level response error code." topic ID"},
      { "name": "TopicsPartitions", "type": "[]TopicSnapshotPartitionData", "versions": "0+",
        "about": "The topicstopic to fetchpartitions.", "fields": [
        ...
        { "name": "NameCurrentLeader", "type": "stringCurrentLeader",
          "versions": "012+", "entityTypetaggedVersions": "topicName12+",
    "tag": 1, "fields": [
     "about": "The name of the topic to{ fetch."name": }"LeaderId",
 "type":     { "name"int32", "versions": "Partitions12+", "typedefault": "[]PartitionSnapshot-1", "versionsentityType": "0+brokerId",
            "about": "The partitions to fetch.", "fields": [
 ID of the current leader or -1 if the leader is unknown."},
          { "name": "IndexLeaderEpoch", "type": "int32", "versions": "012+", "default": "-1",
            "about": "The latest known partitionleader index.epoch" },
             { "name": "ErrorCodeEndPoint", "type": "int16Endpoint", "versions": "014+",
            "about": "The error code, or 0 if there was no fetch error." },
endpoint that can be used to communicate with the leader", "fields": [
            { "name": "SnapshotIdHost", "type": "SnapshotIdstring", "versions": "014+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "14+",
              "about": "The port." }
          ]}
        ]},
        ...
      ]}
    ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 59,
 ": "The snapshot endOffset and epoch fetched",
          "fields": [
          { "name": "EndOffset", "type": "int64request", "versions": "0+" },
          { "name"listeners": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "CurrentLeader", "type["controller"],
  "name": "CurrentLeaderFetchSnapshotRequest",
          "versions"validVersions": "0+-1",
  "taggedVersionsflexibleVersions": "0+",
 "tag": 0, "fields": [
          { "name": "LeaderIdClusterId", "type": "int32string", "versions": "0+", "entityTypenullableVersions": "brokerId0+",
 "default": "null", "taggedVersions": "0+", "tag": 0,
      "about": "The ID of the current leader or -1 if the leader is unknown."},clusterId if known, this is used to validate metadata fetches prior to broker
      registration" },
    { "name": "LeaderEpochReplicaId", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"},
          { "name": "EndPoint+", "typedefault": "Endpoint-1", "versionsentityType": "1+brokerId",
            "about": "The endpointreplica that can be used to communicate with the leader", "fields": [
       ID of the follower" },
     { "name": "HostMaxBytes", "type": "stringint32", "versions": "10+",
     "default": "0x7fffffff",
         "about": "The hostname." },
        maximum bytes to fetch from all of the snapshots" },
    { "name": "PortTopics", "type": "uint16[]TopicSnapshot", "versions": "10+",
              "about": "The port." }
          ]},
  topics to fetch", "fields": [
      { "name": "SizeName", "type": "int64string", "versions": "0+",
 "entityType": "topicName",
        "about": "The totalname size of the snapshot. topic to fetch" },
        { "name": "PositionPartitions", "type": "int64[]PartitionSnapshot", "versions": "0+",
          "about": "The startingpartitions byteto position within the snapshot included in the Bytes field."   },fetch", "fields": [
        { "name": "UnalignedRecordsPartition", "type": "recordsint32", "versions": "0+",
          "about": "SnapshotThe datapartition in records format which may not be aligned on an offset boundary" }
      ]}
    ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

BeginQuorumEpoch

The version of the response is increase and the fields remain unchanged.

TODO: Talk about the leader sending this request as part of quorum check. This is needed because the voter could come back with a new replica uuid. In that case voter can only discover the leader using Fetch unless the leader sends the begin quorum epoch request.

TODO: The receiver of the RPC always applies the request even if it is not a member of the voter set.

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the top level
  3. VoterUuId was added to PartitionData
  4. Allow tagged fields for version greater than or equal to 1.
index" },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "1+", "default": "null",
           "about": "The replica UUID of the follower" }, 
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current leader epoch of the partition, -1 for unknown leader epoch" },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch to fetch", "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "name": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot to start fetching from" }
      ]}
    ]}
  ]
}

Response

Version 1 renames LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "response",
  "name": "FetchSnapshotResponse",
  "validVersions": "0-1",
  "flexibleVersions
Code Block
languagejs
{
  "apiKey": 53,
  "type": "request",
  "listeners": ["controller"],
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+",
  "defaultfields": "null"},[
    { "name": "LeaderIdThrottleTimeMs", "type": "int32", "versions": "10+", "entityTypeignorable": "brokerId"true,
      "about": "The ID of the newly elected leader"},  duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "VoterIdErrorCode", "type": "int32int16", "versions": "10+", "entityTypeignorable": "brokerId"false,
      "about": "The votertop IDlevel ofresponse theerror receiving replicacode." },
    { "name": "Topics", "type": "[]TopicDataTopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicNameName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
of the     { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [topic to fetch." },
        { "name": "PartitionIndexPartitions", "type": "int32[]PartitionSnapshot", "versions": "0+",
          "about": "The partitionpartitions to indexfetch.", "fields": },[
        { "name": "VoterUuidIndex", "type": "uuidint32", "versions": "10+",
          "about": "The replica UUID of the receiving replicapartition index." },
        { "name": "LeaderIdErrorCode", "type": "int32int16", "versions": "0", "entityType": "brokerId"+",
          "about": "The ID of the newly elected leader" error code, or 0 if there was no fetch error." },
        { "name": "LeaderEpochSnapshotId", "type": "int32SnapshotId", "versions": "0+",
          "about": "The epochsnapshot ofendOffset theand newlyepoch elected leaderfetched"},
       ]}
    ]}"fields": [
  ]
}

Response

Version 1 is a flexible version and add the tagged field LeaderEndpoint to PartitionData.

Code Block
languagejs
{
  "apiKey": 53,
  "type": "response",
    { "name": "BeginQuorumEpochResponseEndOffset",
  "validVersionstype": "0-1int64",
  "flexibleVersionsversions": "10+" },
  "fields": [
        { "name": "ErrorCodeEpoch", "type": "int16int32", "versions": "0+", }
      "about": "The top level error code."},
  ]},
        { "name": "TopicsCurrentLeader", "type": "[]TopicDataCurrentLeader",
          "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "TopicNameLeaderId", "type": "stringint32", "versions": "0+", "entityType": "topicNamebrokerId",
        "about": "The topic name." },
      { "nameabout": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
The ID of the current leader or -1 if the leader is unknown."},
          { "name": "PartitionIndexLeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The partition index." latest known leader epoch"},
            { "name": "ErrorCodeEndPoint", "type": "int16Endpoint", "versions": "01+"},
         {   "nameabout": "LeaderId", "typeThe endpoint that can be used to communicate with the leader", "fields": [
            { "name": "int32Host", "versionstype": "0+string", "entityTypeversions": "brokerId1+",
              "about": "The ID of the current leader or -1 if the leader is unknown."},
        hostname." },
            { "name": "LeaderEpochPort", "type": "int32uint16", "versions": "01+",
              "about": "The latest known leader epoch" port." }
          ]},
           { "name": "LeaderEndPointSize", "type": "Endpointint64", "versions": "10+", "taggedVersions": "+1", "tag": 0,
          "about": "The endpointtotal that can be used to communicate with the leader", "fields": [
  size of the snapshot." },
        { "name": "HostPosition", "type": "stringint64", "versions": "10+",
            "about": "The hostname starting byte position within the snapshot included in the Bytes field."   },
          { "name": "PortUnalignedRecords", "type": "uint16records", "versions": "10+",
            "about": "The port.Snapshot data in records format which may not be aligned on an offset boundary" }
        ]}
    ]}
  ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

...


}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set

...

.

EndQuorumEpoch

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the request
  3. VoterUuid was added to the Partitions
  4. ReplicaUuid was added to PreferredSuccessors
  5. Allow tagged fields for versions greater than or equal to 1.

...