Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait for until there are no uncommitted add or remove voter records.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  4. Send an ApiVersions RPC to the first listener to discover the supported kraft.version of the new voter.
  5. Check that the new voter supports the current kraft.version.
  6. Append the AddVoterRecord to the log.
  7. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  8. Wait for the AddVoterRecord to commit using the majority of new voter set.
  9. Send the AddVoter response to the client.

...

Code Block
languagejs
{
  "apiKey": 75,
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "AddVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" },
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
      "about": "The name of the topic" },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique topic ID" },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
      "about": "The IDreplica id of the voter getting added to the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting added to the topic partition" },
    { "name": "Listeners", "type": "[]Listener", "versions": "0+",
      "about": "The endpoints that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint" },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port" },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol" }
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": 75,
  "type": "response",
  "name": "AddVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error" },
    { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
        "about": "The replica id of the current leader or -1 if the leader is unknown" },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The latest known leader epoch" }
    ]},
    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "Endpoint for current leader of the topic partition", "fields": [
      { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
      { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
    ]}
  ]
}

RemoveVoter

...

This RPC can be sent by an administrative client to remove a voter from the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller..

Handling

When the leader receives a RemoveVoter request it will do the following:

  1. Wait until there are no uncommitted add or remove voter records.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  3. Append the RemoveVoterRecord to the log.
  4. The KRaft internal listener will read this record from the log and remove the voter from the voter set.
  5. Wait for the RemoveVoterRecord to commit using the majority of new configuration.
  6. Send the RemoveVoter response to the client.
  7. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

In 3. and 4. it is possible for the RemoveVoterRecord would remove the current leader from the voter set. In this case the leader needs to allow Fetch and FetchSnapshot requests from replicas. The leader should not count itself when determining the majority and determining if records have been committed.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replica ID and UUID that is already not in the committed voter set.
  3. UNSUPPORTED_VERSION - when the kraft.version is not greater than 1.

Request

Code Block
languagejs
{
  "apiKey": 76,
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions
Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
  "fields": [
        { "name": "IndexTopicName", "type": "int32string", "versions": "0+",
    "entityType": "topicName",
      "about": "The name partition index.of the topic" },
        { "name": "VoterIdTopicId", "type": "int32uuid", "versions": "0+",
          "about": "The unique topic ID" },
 of the voter getting removed from the topic partition." },
    { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterUuidVoterId", "type": "uuidint32", "versions": "0+",
          "about": "The replica generatedid UUID of the replicavoter getting removed as a voter from the topic partition." },
    { "name": "VoterUuid", "type": "uuid",  ]}
    ]"versions": "0+",
      "about": "The directory id of the voter getting removed from the topic partition" }
  ]
}

Response

Code Block
languagejs
{
  "apiKey": "TBD"76,
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code. error code, or 0 if there was no error" },
    { "name": "TopicsCurrentLeader", "type": "[]TopicDataLeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "TopicNameLeaderId", "type": "stringint32", "versions": "0+", "default": "-1", "entityType" : "topicNamebrokerId",
        "about": "The replica nameid of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [current leader or -1 if the leader is unknown" },
        { "name": "IndexLeaderEpoch", "type": "int32", "versions": "0+",
 "default": "-1",
        "about": "The latest known partitionleader index.epoch" },
    ]},
    { "name": "ErrorCodeNodeEndpoint", "type": "int16NodeEndpoint", "versions": "0+",
 "taggedVersions": "0+", "tag": 1,
      "about": "TheEndpoint errorfor code,current orleader 0of ifthe theretopic was no fetch error." },partition", "fields": [
        { "name": "CurrentLeaderHost", "type": "LeaderIdAndEpochstring", "versions": "0+", "taggedVersionsabout": "0+", "tag": 0, "fields": [
    The node's hostname" },
      { "name": "LeaderIdPort", "type": "int32", "versions": "120+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown.node's port" },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch." },
          {]}
  ]
}

UpdateVoter

This RPC is different from AddVoter in two ways. It will be sent only by voters to the latest known leader. It include both listener endpoints and kraft.version information. This is useful to automatically update that information without additional operator intervention.

Handling

When the leader receives an UpdateVoter request it will do the following:

  1. Wait until there are no uncommitted add or remove voter records.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  3. Check that the updated voter supports the current kraft.version.
  4. Append the updated AddVoterRecord to the log.
  5. The KRaft internal listener will read this record from the log and update the voter's information. This include updating the endpoint used by the KRaft NetworkClient.
  6. Wait for the AddVoterRecord to commit using the majority of new voter set.
  7. Send the UpdateVoter response to the client.

Request

Code Block
{
  "apiKey": 77,
  "type": "request",
  "listeners": ["controller"],
  "name": "EndPointUpdateVoterRequest",
  "typevalidVersions": "Endpoint0",
  "versionsflexibleVersions": "0+",
  "fields": [
    {    "name": "ClusterId", "abouttype": "The endpoint that can be used to communicate with the leaderstring", "fieldsversions": [
        "0+" },
     { "name": "HostTopicName", "type": "string", "versions": "0+",
        "entityType": "topicName",
      "about": "The hostname. name of the topic" },
            { "name": "PortTopicId", "type": "uint16uuid", "versions": "0+",
              "about": "The port. unique topic ID" },
    {      ]}
        ]}
      ]}"name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index" },
    { "name": "VoterId", "type": "int32", "versions": "0+",
    ]}
  ]
}

Handling

When the leader receives a RemoveVoter request it will do the following:

  1. Wait for until there are no uncommitted add or remove voter records.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  3. Append the RemoveVoterRecord to the log.
  4. The KRaft internal listener will read this record from the log and remove the voter from the voter set.
  5. Wait for the RemoveVoterRecord to commit using the majority of new configuration.
  6. Send the RemoveVoter response to the client.
  7. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

In 3. and 4. it is possible for the RemoveVoterRecord would remove the current leader from the voter set. In this case the leader needs to allow Fetch and FetchSnapshot requests from replicas. The leader should not count itself when determining the majority and determining if records have been committed.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replic ID and UUID that is already not in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not a voter.
  4. INVALID_REQUEST - when the controller.quorum.voters is used and the request contains a replica ID that would cause the replica ID to get removed from the voter set. In other words RemoveVoter can be used to remove replica UUIDs not replica IDs when the controller.quorum.voters configuration is used.

The broker will handle this request by forwarding the request to the active controller.

UpdateVoter

TODO: fill this out

Request

Response

...

  "about": "The replica id of the voter getting updated in the topic partition" },
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The directory id of the voter getting updated in the topic partition" },
    { "name": "Listeners", "type": "[]Listener", "versions": "0+",
      "about": "The endpoint that can be used to communicate with the leader", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint" },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port" },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol" }
    ]},
    { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+",
      "about": "The range of versions of the protocol that the replica supports", "fields": [
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported KRaft protocol version" },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported KRaft protocol version" }
    ]}
  ]
}

Response

Code Block
{
  "apiKey": 77,
  "type": "response",
  "name": "UpdateVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error" },
    { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
      { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
        "about": "The replica id of the current leader or -1 if the leader is unknown" },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The latest known leader epoch" }
    ]},
    { "name": "NodeEndpoint", "type": "NodeEndpoint", "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "Endpoint for current leader of the topic partition", "fields": [
      { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
      { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
    ]}
  ]
}

UpdateFeatures

TODO: Fill this out

Handling

Request

Response

Fetch

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica ID of the follower, of -1 if this request is from a consumer." },
    ...
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true,
        "about": "The name of the topic to fetch." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true,
        "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "14+", "nullableVersions": "14+", "default": "null",
          "about": "The replica generated UUID. null otherwise." },
        ...
      ]}
    ]}
  ]
}

Response

Version 14 rename LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    ...
    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "ignorable": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        ...
        { "name": "CurrentLeader", "type": "CurrentLeader",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [
           { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"},
          { "name": "EndPoint", "type": "Endpoint", "versions": "14+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Host", "type": "string", "versions": "14+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "14+",
              "about": "The port." }
          ]}
        ]},
        ...
      ]}
    ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

...

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
      "about": "The clusterId if known, this is used to validate metadata fetches prior to broker
   registration" },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The replica ID of the follower" },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
      "about": "The maximum bytes to fetch from all of the snapshots" },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch" },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index" },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "1+", "default": "null",
           "about": "The replica UUID of the follower" }, 
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current leader epoch of the partition, -1 for unknown leader epoch" },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch to fetch", "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "name": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot to start fetching from" }
      ]}
    ]}
  ]
}

Response

Version 1 renames LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "response",
  "name": "FetchSnapshotResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch fetched",
          "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "name": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "CurrentLeader", "type": "CurrentLeader",
          "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"},
          { "name": "EndPoint", "type": "Endpoint", "versions": "1+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Host", "type": "string", "versions": "1+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "1+",
              "about": "The port." }
          ]},
        { "name": "Size", "type": "int64", "versions": "0+",
          "about": "The total size of the snapshot." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The starting byte position within the snapshot included in the Bytes field."   },
        { "name": "UnalignedRecords", "type": "records", "versions": "0+",
          "about": "Snapshot data in records format which may not be aligned on an offset boundary" }
      ]}
    ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

...

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

Vote

Request

Changes:

  1. Candidate Id was moved out of the topic partition maps
  2. Candidate Uuid was added to the PartitionData
  3. VoterId was added to the top level
  4. VoterUuId was added to PartitionData
Code Block
languagejs
{
  "apiKey": 52,
  "type": "request",
  "listeners": ["controller"],
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null" },
    { "name": "CandidateId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the voter sending the request" },
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the replica receiving the request to vote." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
          "about": "The bumped epoch of the candidate sending the request"},
        { "name": "CandidateId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the voter sending the request"},
        { "name": "CandidateUuid", "type": "uuid", "versions": "1+",
          "about": "The candidate generated UUID, null otherwise" },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+", "nullableVersions": "1+", "default": "null" }
          "about": "The replica generated UUID of the replica receiving the request to vote, null otherwise" }, 
        { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the last record written to the metadata log"},
        { "name": "LastOffset", "type": "int64", "versions": "0+",
          "about": "The offset of the last record written to the metadata log"}
      ]}
    ]}
  ]
}

Response

Version 1 addes the tagged field LeaderEndpoint to PartitionData.

Code Block
languagejs
{
  "apiKey": 52,
  "type": "response",
  "name": "VoteResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "VoteGranted", "type": "bool", "versions": "0+",
          "about": "True if the vote was granted and false otherwise"},
        { "name": "VoterUuid", "type": "bool", "versions": "1+",
          "about": "The replica generated uuid for the replica casting a vote." },
        { "name": "LeaderEndPoint", "type": "Endpoint", "versions": "1+", "taggedVersions": "+1", "tag": 0,
          "about": "The endpoint that can be used to communicate with the leader", "fields": [
          { "name": "Host", "type": "string", "versions": "1+",
            "about": "The hostname." },
          { "name": "Port", "type": "uint16", "versions": "1+",
            "about": "The port." }
        ]}
      ]}
    ]}
  ]
}

Handling

The handling of the Vote request will from that described in KIP-595 in that all replicas are allowed to vote event if they are not voters for the quorum. The voter will persist in the quorum state both the candidate ID and UUID so that it only votes for at most one candidate for a given epoch.

...

TODO: The receiver of the RPC always applies the request even if it is not a member of the voter set.

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the top level
  3. VoterUuId was added to PartitionData
  4. Allow tagged fields for version greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 53,
  "type": "request",
  "listeners": ["controller"],
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the newly elected leader"}, 
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The voter ID of the receiving replica." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
          "about": "The replica UUID of the receiving replica." },
        { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the newly elected leader"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the newly elected leader"}
      ]}
    ]}
  ]
}

Response

Version 1 is a flexible version and add the tagged field LeaderEndpoint to PartitionData.

Code Block
languagejs
{
  "apiKey": 53,
  "type": "response",
  "name": "BeginQuorumEpochResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "LeaderEndPoint", "type": "Endpoint", "versions": "1+", "taggedVersions": "+1", "tag": 0,
          "about": "The endpoint that can be used to communicate with the leader", "fields": [
          { "name": "Host", "type": "string", "versions": "1+",
            "about": "The hostname." },
          { "name": "Port", "type": "uint16", "versions": "1+",
            "about": "The port." }
        ]}
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match.
  2. NOT_VOTER - when the the replica is not a voter in the topic partition. This error should be retry by the leader of the topic partition.

EndQuorumEpoch

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the request
  3. VoterUuid was added to the Partitions
  4. ReplicaUuid was added to PreferredSuccessors
  5. Allow tagged fields for versions greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 54,
  "type": "request",
  "listeners": ["controller"],
  "name": "EndQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The current leader ID that is resigning." },
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The voter ID of the receiving replica." }, 
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
          "about": "The replica UUID of the receiving replica." }, 
        { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The current leader ID that is resigning"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current epoch"},
        { "name": "PreferredSuccessors", "type": "[]ReplicaInfo", "versions": "0+",
          "about": "A sorted list of preferred successors to start the election", "fields": [
          { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
          { "name": "ReplicaUuid", "type": "uuid", "versions": "1+" }
        ]} 
      ]}
    ]}
  ]
}

Response

Version 1 is a flexible version and add the tagged field LeaderEndpoint to PartitionData.

Code Block
languagejs
{
  "apiKey": 54,
  "type": "response",
  "name": "EndQuorumEpochResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "LeaderEndPoint", "type": "Endpoint", "versions": "1+", "taggedVersions": "+1", "tag": 0,
          "about": "The endpoint that can be used to communicate with the leader", "fields": [
          { "name": "Host", "type": "string", "versions": "1+",
            "about": "The hostname." },
          { "name": "Port", "type": "uint16", "versions": "1+",
            "about": "The port." }
        ]}
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

...

The version of the request is increase and the fields remain unchanged.

Response

  1. Add ReplicaUuid to ReplicaState
Code Block
languagejs
{
  "apiKey": 55,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}
  ],
  "commonStructs": [
    { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
      { "name": "ReplicaUuid", "type": "uuid", "versions": "2+" },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
      { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter" },
      { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter" }
    ]}
  ]
}

Handling

The handling of the request is the same as that described in KIP-595 with just the additional fields. Clients handling the response can assume that if a ReplicaState include both the ID and the UUID that replica can become a voter if is enumerated in the Observers field.

...