Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/EndQuorumEpochResponse.json
diff --git a/clients/src/main/resources/common/message/EndQuorumEpochResponse.json b/clients/src/main/resources/common/message/EndQuorumEpochResponse.json
index cd23247045..0d5d61b7e7 100644
--- a/clients/src/main/resources/common/message/EndQuorumEpochResponse.json
+++ b/clients/src/main/resources/common/message/EndQuorumEpochResponse.json
@@ -17,8 +17,8 @@
   "apiKey": 54,
   "type": "response",
   "name": "EndQuorumEpochResponse",
-  "validVersions": "0",
-  "flexibleVersions": "none",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top level error code."},
@@ -36,6 +36,13 @@
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The latest known leader epoch"}
       ]}
+    ]},
+    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "1+", "taggedVersions": "1+", "tag": 0,
+      "about": "Endpoints for all leaders enumerated in PartitionData", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "1+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
+      { "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
+      { "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
     ]}
   ]
 }

Fetch

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Handling

TODO: Fix this wording.

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

Request

TODO: Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/FetchRequest.json
diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json
index 235357d004..ff86469831 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -55,7 +55,9 @@
   // deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
   //
   // Version 16 is the same as version 15 (KIP-951).
-  "validVersions": "0-16",
+  //
+  // Version 17 adds directory id support from KIP-853
+  "validVersions": "0-17",
   "deprecatedVersions": "0-3",
   "flexibleVersions": "12+",
   "fields": [
@@ -100,7 +102,9 @@
   
Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica ID of the follower, of -1 if this request is from a consumer." },
    ...
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicLogStartOffset", "type": "stringint64", "versions": "0-125+", "entityTypedefault": "topicName-1", "ignorable": true,
           "about": "The nameearliest ofavailable the topic to fetch." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true,
        "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "14+", "nullableVersions": "14+", "default": "null",
          "about": "The replica generated UUID. null otherwise." },
        ...
      ]}
    ]}
  ]
}

Response

Version 14 rename LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    ...
    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "ignorable": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        ...
        { "name": "CurrentLeader", "type": "CurrentLeader",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [
           { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"},
          { "name": "EndPoint", "type": "Endpoint", "versions": "14+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Host", "type": "string", "versions": "14+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "14+",
              "about": "The port." }
          ]}
        ]},
        ...
      ]}
    ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
offset of the follower replica.  The field is only used when the request is sent by the follower."},
         { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
-          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." }
+          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
+        { "name": "ReplicaUuid", "type": "uuid", "versions": "17+",
+          "about": "The directory id of the follower fetching" }
       ]}
     ]},
     { "name": "ClusterIdForgottenTopicsData", "type": "string[]ForgottenTopic", "versions": "07+", "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
      "about": "The clusterId if known, this is used to validate metadata fetches prior to broker
   registration" },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The replica ID of the follower" },
ignorable": false,

Response

Code Block
git diff upstream/trunk clients/src/main/resources/common/message/FetchResponse.json
diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json
index e5f49ba6fd..b432a79719 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -47,7 +47,7 @@
   // Version 15 is the same as version 14 (KIP-903).
   //
   // Version 16 adds the 'NodeEndpoints' field (KIP-951).
-  "validVersions": "0-16",
+  "validVersions": "0-17",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "MaxBytesThrottleTimeMs", "type": "int32", "versions": "01+", "defaultignorable": true,

FetchSnapshot

Handling

TODO: Fix this wording.

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/FetchSnapshotRequest.json
diff --git a/clients/src/main/resources/common/message/FetchSnapshotRequest.json b/clients/src/main/resources/common/message/FetchSnapshotRequest.json
index 358ef2e322..9a577d6289 100644
--- a/clients/src/main/resources/common/message/FetchSnapshotRequest.json
+++ b/clients/src/main/resources/common/message/FetchSnapshotRequest.json
@@ -18,11 +18,11 @@
   "type": "request",
   "listeners": ["controller"],
  "0x7fffffff",
      "about": "The maximum bytes to fetch from all of the snapshots" },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch" },
      { "name": "PartitionsFetchSnapshotRequest",
-  "typevalidVersions": "[]PartitionSnapshot0",
+  "versionsvalidVersions": "0+-1",
        "aboutflexibleVersions": "The partitions to fetch","0+",
   "fields": [
        { "name": "PartitionClusterId", "type": "int32string", "versions": "0+",
 "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
       "about": "The cluster ID partitionif indexknown" },
        { "name": "ReplicaUuidReplicaId", "type": "uuidint32", "versions": "10+", "default": "null-1",
 "entityType": "brokerId",
         "about": "The replicabroker UUIDID of the follower" }, 
        { "name": "CurrentLeaderEpochMaxBytes", "type": "int32", "versions": "0+",
          "aboutdefault": "The current leader epoch of the partition, -1 for unknown leader epoch" },
"0x7fffffff",
@@ -44,7 +44,9 @@
           { "name": "SnapshotIdEpoch", "type": "SnapshotIdint32", "versions": "0+", }
          "about": "The snapshot endOffset and epoch to fetch", "fields": []},
          { "name": "EndOffsetPosition", "type": "int64", "versions": "0+" },
-          { "nameabout": "Epoch", "type": "int32", "versions": "0+" }
        ]},
The byte position within the snapshot to start fetching from" }
+          "about": "The byte position within the snapshot to start fetching from" },
+        { "name": "PositionReplicaUuid", "type": "int64uuid", "versions": "01+",
+          "about": "The byte position within the snapshot to start fetching from" }
      ]}
    ]}
  ]
}

Response

Version 1 renames LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

": "The directory id of the follower fetching" }
       ]}
     ]}
   ]

Response

Version 1 renames LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/FetchSnapshotResponse.json | cat
diff --git a/clients/src/main/resources/common/message/FetchSnapshotResponse.json b/clients/src/main/resources/common/message/FetchSnapshotResponse.json
index 887a5e4401..2d9d269930 100644
--- a/clients/src/main/resources/common/message/FetchSnapshotResponse.json
+++ b/clients/src/main/resources/common/message/FetchSnapshotResponse.json
@@ -17,23 +17,23 @@
  
Code Block
languagejs
{
  "apiKey": 59,
   "type": "response",
   "name": "FetchSnapshotResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
       "about": "The top level response error code." },
     { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
       "about": "The topics to fetch.", "fields": [
       { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The name of the topic to fetch." },
       { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
         "about": "The partitions to fetch.", "fields": [
         { "name": "Index", "type": "int32", "versions": "0+",
           "about": "The partition index." },
         { "name": "ErrorCode", "type": "int16", "versions": "0+",
           "about": "The error code, or 0 if there was no fetch error." },
         { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch fetched",
          "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "nameabout": "Epoch", "type": "int32", "versions": "0+" }
The snapshot endOffset and epoch fetched",
            ]},
"fields": [
@@ -43,17 +43,24 @@
         { "name": "CurrentLeader", "type": "CurrentLeaderLeaderIdAndEpoch",
           "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
           { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
             "about": "The ID of the current leader or -1 if the leader is unknown." },
           { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
             "about": "The latest known leader epoch" }
         ]},
             { "name": "EndPointSize", "type": "Endpointint64", "versions": "10+",
            "about": "The endpointtotal that can be used to communicate withsize of the leadersnapshot", "fields": [
    },
         { "name": "HostPosition", "type": "stringint64", "versions": "10+",
              "about": "The hostname. starting byte position within the snapshot included in the Bytes field" },
            { "name": "PortUnalignedRecords", "type": "uint16records", "versions": "10+",
              "about": "The port.Snapshot data in records format which may not be aligned on an offset boundary" }
       ]}
+    ]},
+        { "name": "SizeNodeEndpoints", "type": "int64[]NodeEndpoint", "versions": "01+",
 "taggedVersions": "1+", "tag": 0,
+      "about": "TheEndpoints for totalall sizecurrent-leaders ofenumerated thein snapshot.PartitionSnapshot" },
 "fields": [
+      { "name": "PositionNodeId", "type": "int64int32", "versions": "01+",
+        "mapKey": true, "entityType": "brokerId", "about": "The starting byte position withinID of the snapshot included in the Bytes field."   associated node" },
  +      { "name": "UnalignedRecordsHost", "type": "recordsstring", "versions": "01+",
          "about": "SnapshotThe data in records format which may not be aligned on an offset boundarynode's hostname" },
+      ]}
    ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

...

{ "name": "Port", "type": "int32", "versions": "1+", "about": "The node's port" }
     ]}
   ]
 }
 

DescribeQuorum

TODO: Fix this RPC

The version of the request is increase and the fields remain unchanged.

...

A future KIP will describe how the kafka-metadata-shell tool will be extended to be able to read and display KRaft control records from the quorum,   snapshot and log.

kafka-storage.sh

TODO: Describe these changesA future KIP describe how the kafka-storage tool will be extended to be able to bootstrap the first quorum node by writing an AddVoterRecord to the cluster metadata log when the controller.quorum.bootstrap.servers configuration is used.

kafka-metadata-quorum.sh

The kafka-metadata-quorum tool described in KIP-595 and KIP-836 will be improved to support these additional commands:

...

Code Block
> bin/kafka-metadata-quorum.sh --describe
ClusterId:              SomeClusterId
LeaderId:               0
LeaderEpoch:            15
HighWatermark:          234130
MaxFollowerLag:         34
MaxFollowerLagTimeMs:   15
CurrentVoters:          [{"id": 0, "uuid": "UUID1"}, {"id": 1, "uuid": "UUID2"}, {"id": 2, "uuid": "UUID2"}]
Observers:              [{"id": 3, "uuid": "UUID3"}]

...