...
Version 0 of this data will get written if the kraft.version
is 0. Version 1 of this data will get written if the kraft.version
is 1.
TODO: There is one small issue. A candidate at kraft.version 1 may ask for a vote from a voter at kraft.version 0. In this case how will the voter persist the uuid?
TODO: Add endpoint and kraft.version information to the quorum state fileIf a candidate sends a vote request with a replica UUID (version 2 of the RPC) but the voter's kraft.version is at 0, the voter will not persist the voted UUID and use version 0 of quorum data. The voter will continue to track the voted UUID in memory. This mean that after a restart the voter can vote for a different UUID but this should be rare and can only happing while the protocol is getting upgraded.
Code Block | ||
---|---|---|
| ||
git diff upstream/trunk raft/src/main/resources/common/message/QuorumStateData.json diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json index d71a32c75d..34881f05ff70ff2d6f42 100644 --- a/raft/src/main/resources/common/message/QuorumStateData.json +++ b/raft/src/main/resources/common/message/QuorumStateData.json @@ -16,19 +16,2543 @@ { "type": "data", "name": "QuorumStateData", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ - {"name": "ClusterId", "type": "string", "versions": "0+"}, - {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"}, - {"name": "AppliedOffset", "type": "int64", "versions": "0+"}, - {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"} + { "name": "ClusterId", "type": "string", "versions": "0" }, + { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1" }, + { "name": "VotedUuid", "type": "uuid", "versions": "1+" }, + { "name": "AppliedOffset", "type": "int64", "versions": "0" }, + { "name": "AppliedRecord", "type": "AppliedRecord", "versions": "1+", "fields": [ + { "name": "EndOffset", "type": "int64", "versions": "1+" }, + { "name": "Epoch", "type": "int32", "versions": "1+, "versions": "1+" } + ]}, + { "name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+" } ], "commonStructs": [ { "name": "Voter", "versions": "0+", "fields": [ - {"name": "VoterId", "type": "int32", "versions": "0+"} + { "name": "VoterId", "type": "int32", "versions": "0+" }, + { "name": "VoterUuid", "type": "uuid", "versions": "1+" }, + { "name": "EndPoints", "type": "[]Endpoint", "versions": "1+", + "about": "The endpoint that can be used to communicate with the voter", "fields": [ + { "name": "Name", "type": "string", "versions": "1+", "mapKey": true, + "about": "The name of the endpoint" }, + { "name": "Host", "type": "string", "versions": "1+", + "about": "The hostname" }, + { "name": "Port", "type": "uint16", "versions": "1+", + "about": "The port" }, + { "name": "SecurityProtocol", "type": "int16", "versions": "1+", + "about": "The security protocol" } + ]}, + { "name": "CurrentVotersKRaftVersionFeature", "type": "[]VoterKRaftVersionFeature", "versions": "01+", + "nullableVersions": "0+" } ], "commonStructsabout": "The [ range of versions of the { "name": "Voter", "versions": "0+"protocol that the replica supports", "fields": [ -+ { "name": "VoterIdMinSupportedVersion", "type": "int32int16", "versions": "01+"}, + { "nameabout": "VoterId", "type": "int32", "versions": "0+"The minimum supported KRaft protocol version" }, + { "name": "VoterUuidMaxSupportedVersion", "type": "uuidint16", "versions": "1+", + "about": "The maximum supported KRaft protocol version" } + ]} ]} ] } |
RPCs
UpdateFeatures
...
Code Block | ||
---|---|---|
| ||
git diff upstream/trunk clients/src/main/resources/common/message/FetchSnapshotResponse.json | cat
diff --git a/clients/src/main/resources/common/message/FetchSnapshotResponse.json b/clients/src/main/resources/common/message/FetchSnapshotResponse.json
index 887a5e4401..2d9d269930 100644
--- a/clients/src/main/resources/common/message/FetchSnapshotResponse.json
+++ b/clients/src/main/resources/common/message/FetchSnapshotResponse.json
@@ -17,23 +17,23 @@
"apiKey": 59,
"type": "response",
"name": "FetchSnapshotResponse",
- "validVersions": "0",
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
"about": "The top level response error code" },
{ "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
"about": "The topics to fetch", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the topic to fetch" },
{ "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
"about": "The partitions to fetch", "fields": [
{ "name": "Index", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no fetch error" },
{ "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
"about": "The snapshot endOffset and epoch fetched",
"fields": [
@@ -43,17 +43,24 @@
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
"versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the current leader or -1 if the leader is unknown" },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The latest known leader epoch" }
]},
{ "name": "Size", "type": "int64", "versions": "0+",
"about": "The total size of the snapshot" },
{ "name": "Position", "type": "int64", "versions": "0+",
"about": "The starting byte position within the snapshot included in the Bytes field" },
{ "name": "UnalignedRecords", "type": "records", "versions": "0+",
"about": "Snapshot data in records format which may not be aligned on an offset boundary" }
]}
+ ]},
+ { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+ "about": "Endpoints for all current-leaders enumerated in PartitionSnapshot", "fields": [
+ { "name": "NodeId", "type": "int32", "versions": "1+",
+ "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+ { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
+ { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
]}
]
} |
...