Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
{
  "type": "data",
  "name": "AddVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the voter getting added to the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The replica generated UUID of the replica getting added as a voter to the topic partition"},
    { "name": "EndPoints", "type": "[]VoterEndpointEndpoint", "versions": "0+",
      "about": "The endpointsendpoint that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "AddVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterId", "type": "int32", "versions": "0+",
          "about": "The ID of the voter getting added to the topic partition." },
        { "name": "VoterUuid", "type": "uuid", "versions": "0+",
          "about": "The replica generated UUID of the replica getting added as a voter to the topic partition." },
        { "name": "EndPointsEndPoint", "type": "[]VoterEndpointEndpoint", "versions": "0+",
          "about": "The endpointsendpoint that can be used to communicate with the voterleader", "fields": [
          { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
            "about": "The name of the endpoint." },
          { "name": "Host", "type": "string", "versions": "0+",
            "about": "The hostname." },
          { "name": "Port", "type": "uint16", "versions": "0+",
            "about": "The port." },
          { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
            "about": "The security protocol." }
        ]}
      ]}
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "AddVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch." },
          { "name": "EndPoint", "type": "Endpoint",  ]}"versions": "0+",
      ]}
    ]}
  ]
}

Handling

When the leader receives a AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait for until there are no uncommitted add or remove voter records.
  3. Append the AddVoterRecord to the log.
  4. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  5. Wait for the AddVoterRecord to commit using the majority of new configuration.
  6. Send the AddVoter response to the client.

In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log the set of voter changes. If the added voter is far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will fail this RPC until it discovers that it is in the voter set. The leader will continue to retry until the RPC succeeds.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replic ID and UUID that is already in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not an observer.
  4. INVALID_REQUEST - when the request contains endpoints but the quorum was configured to use controller.quorum.voters.
  5. INVALID_REQUEST - when the request contains a replica ID that is not enumerated in the controller.quorum.voters configuration.

The broker will handle this request by forwarding the request to the active controller.

RemoveVoter

Request

This RPC can be sent by an administrative client to remove a voter from the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.

  "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
              "about": "The name of the endpoint." },
            { "name": "Host", "type": "string", "versions": "0+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "0+",
              "about": "The port." },
            { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
              "about": "The security protocol." }
          ]}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait for until there are no uncommitted add or remove voter records.
  3. Append the AddVoterRecord to the log.
  4. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  5. Wait for the AddVoterRecord to commit using the majority of new configuration.
  6. Send the AddVoter response to the client.

In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log the set of voter changes. If the added voter is far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will fail this RPC until it discovers that it is in the voter set. The leader will continue to retry until the RPC succeeds.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replic ID and UUID that is already in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not an observer.
  4. INVALID_REQUEST - when the request contains endpoints but the quorum was configured to use controller.quorum.voters.
  5. INVALID_REQUEST - when the request contains a replica ID that is not enumerated in the controller.quorum.voters configuration.

The broker will handle this request by forwarding the request to the active controller.

RemoveVoter

Request

This RPC can be sent by an administrative client to remove a voter from the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.

Code Block
languagejs
Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterId", "type": "int32", "versions": "0+",
          "about": "The ID of the voter getting removed from the topic partition." },
        { "name": "VoterUuid", "type": "uuid", "versions": "0+",
          "about": "The replica generated UUID of the replica getting removed as a voter from the topic partition." },
      ]}
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch." },
          { "name": "EndPoint", "type": "Endpoint",  ]}"versions": "0+",
      ]}
    ]}
  ]
}

Handling

When the leader receives a RemoveVoter request it will do the following:

  1. Wait for until there are no uncommitted add or remove voter records.
  2. Append the RemoveVoterRecord to the log.
  3. The KRaft internal listener will read this record from the log and remove the voter from the voter set.
  4. Wait for the RemoveVoterRecord to commit using the majority of new configuration.
  5. Send the RemoveVoter response to the client.
  6. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

In 3. and 4. it is possible for the RemoveVoterRecord would remove the current leader from the voter set. In this case the leader needs to allow Fetch and FetchSnapshot requests from replicas. The leader should not count itself when determining the majority and determining if records have been committed.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replic ID and UUID that is already not in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not a voter.
  4. INVALID_REQUEST - when the controller.quorum.voters is used and the request contains a replica ID that would cause the replica ID to get removed from the voter set. In other words RemoveVoter can be used to remove replica UUIDs not replica IDs when the controller.quorum.voters configuration is used.

The broker will handle this request by forwarding the request to the active controller.

Fetch

The Fetch response version is bump and the fields remain unchanged.

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

  "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
              "about": "The name of the endpoint." },
            { "name": "Host", "type": "string", "versions": "0+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "0+",
              "about": "The port." },
            { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
              "about": "The security protocol." }
          ]}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a RemoveVoter request it will do the following:

  1. Wait for until there are no uncommitted add or remove voter records.
  2. Append the RemoveVoterRecord to the log.
  3. The KRaft internal listener will read this record from the log and remove the voter from the voter set.
  4. Wait for the RemoveVoterRecord to commit using the majority of new configuration.
  5. Send the RemoveVoter response to the client.
  6. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

In 3. and 4. it is possible for the RemoveVoterRecord would remove the current leader from the voter set. In this case the leader needs to allow Fetch and FetchSnapshot requests from replicas. The leader should not count itself when determining the majority and determining if records have been committed.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replic ID and UUID that is already not in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not a voter.
  4. INVALID_REQUEST - when the controller.quorum.voters is used and the request contains a replica ID that would cause the replica ID to get removed from the voter set. In other words RemoveVoter can be used to remove replica UUIDs not replica IDs when the controller.quorum.voters configuration is used.

The broker will handle this request by forwarding the request to the active controller.

Fetch

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica ID of the follower, of -1 if this request is from a consumer." },
    ...
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true,
        "about": "The name of the topic to fetch." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true,
        "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "14+", "nullableVersions": "14+", "default": "null",
          "about": "The replica generated UUID. null otherwise." },
        ...
      ]}
    ]}
  ]
}

Response

Version 14 rename LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    ...
    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "ignorable": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        ...
        { "name": "CurrentLeader", "type": "CurrentLeader",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [
           { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"},
          { "name": "EndPoint", "type": "Endpoint", "versions": "14+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Name", "type": "string", "versions": "14+", "mapKey": true,
              "about": "The name of the endpoint." },
            { "name": "Host", "type": "string", "versions": "14+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "14+",
              "about": "The port." },
            { "name": "SecurityProtocol", "type": "int16", "versions": "14+",
              "about": "The security protocol." }
          ]}
        ]},
        ...
      ]}
    ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
      "about": "The clusterId if known, this is used to validate metadata fetches prior to broker
   registration" },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The replica ID of the follower" },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
      "about": "The maximum bytes to fetch from all of the snapshots" },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch" },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index" },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "1+", "default": "null",
           "about": "The replica UUID of the follower" }, 
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current leader epoch of the partition, -1 for unknown leader epoch" },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch to fetch", "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "name": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot to start fetching from" }
      ]}
    ]}
  ]
}

Response

Version 1 renames LeaderIdAndEpoc to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "response",
  "name": "FetchSnapshotResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch fetched",
          "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "name": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "CurrentLeader", "type": "CurrentLeader",
          "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"},
          { "name": "EndPoint", "type": "Endpoint", "versions": "1+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Name", "type": "string", "versions": "1+", "mapKey": true,
              "about": "The name of the endpoint." },
            { "name": "Host", "type": "string", "versions": "1+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "1+",
              "about": "The port." },
            { "name": "SecurityProtocol", "type": "int16", "versions": "1+",
              "about": "The security protocol." }
          ]},
        { "name": "Size", "type": "int64", "versions": "0+",
          "about": "The total size of the snapshot." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The starting byte position within the snapshot included in the Bytes field."   },
        { "name": "UnalignedRecords", "type": "records", "versions": "0+",
          "about": "Snapshot data in records format which may not be aligned on an offset boundary" }
      ]}
    ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

Vote

Request

Changes:

  1. Candidate Id was moved out of the topic partition maps
  2. Candidate Uuid was added to the PartitionData
  3. VoterId was added to the top level
  4. VoterUuId was added to PartitionData
Code Block
languagejs
{
  "apiKey": 52,
  "type": "request",
  "listeners": ["controller"],
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null" },
    { "name": "CandidateId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the voter sending the request" },
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the replica receiving the request to vote." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
          "about": "The bumped epoch of the candidate sending the request"},
        { "name": "CandidateId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the voter sending the request"},
        { "name": "CandidateUuid", "type": "uuid", "versions": "1+",
          "about": "The candidate generated UUID, null otherwise" },
    
Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ClusterIdVoterUuid", "type": "stringuuid", "versions": "121+", "nullableVersions": "121+", "default": "null" }
          "about": "The replica generated UUID of the replica receiving the request to vote, "taggedVersionsnull otherwise" }, 
        { "name": "12+LastOffsetEpoch", "tagtype": 0"int32", "ignorableversions": true"0+",
          "about": "The clusterIdepoch ifof known.the Thislast isrecord usedwritten to validatethe metadata fetcheslog"},
 prior to broker registration." },
    { "name": "ReplicaIdLastOffset", "type": "int32int64", "versions": "0+", "entityType": "brokerId",
          "about": "The replica IDoffset of the follower, of -1 if this request is from a consumer." }, last record written to the metadata log"}
      ]}
    ]}
  ]
}

Response

Version 1 addes the tagged field LeaderEndpoint to PartitionData.

Code Block
languagejs
{...
  "apiKey": 52,
 { "nametype": "Topicsresponse",
  "typename": "[]FetchTopicVoteResponse",
  "versionsvalidVersions": "0+-1",
      "aboutflexibleVersions": "The topics to fetch.0+",
  "fields": [
      { "name": "TopicErrorCode", "type": "stringint16", "versions": "0-12", "entityType+",
      "about": "The top level error code."},
    { "name": "topicNameTopics", "ignorabletype": true"[]TopicData",
        "aboutversions": "0+"The name of the topic to fetch." },, "fields": [
      { "name": "TopicIdTopicName", "type": "uuidstring", "versions": "130+", "ignorableentityType": true"topicName",
        "about": "The unique topic IDname." },
      { "name": "Partitions", "type": "[]FetchPartitionPartitionData", "versions": "0+",
        "aboutversions": "The partitions to fetch.0+", "fields": [
        { "name": "PartitionPartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
         { "name": "ReplicaUuidErrorCode", "type": "uuidint16", "versions": "140+"},
 "nullableVersions       { "name": "14+LeaderId", "defaulttype": "nullint32",
          "about": "The replica generated UUID. null otherwise." },
  "versions": "0+", "entityType": "brokerId",
      ...
    "about": "The ]}
ID of  the ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

The FetchSnapshot response version is bump and the fields remain unchanged.

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "VoteGranted", "type": "bool", "versions": "0+",
          "about": "True if the vote was granted and false otherwise"},
        { "name": "LeaderEndPoint", "type": "Endpoint", "versions": "1+", "taggedVersions": "+1", "tag": 0,
          "about": "The endpoint that can be used to communicate with the leader", "fields": [
          { "name": "Name", "type": "string", "versions": "1+", "mapKey": true,
            "about": "The name of the endpoint." },
          { "name": "ClusterIdHost", "type": "string", "versions": "01+",
            "nullableVersionsabout": "0+", "defaultThe hostname." },
          { "name": "nullPort", "taggedVersionstype": "0+uint16", "tagversions": 0"1+",
            "about": "The clusterId if known, this is used to validate metadata fetches prior to broker
   registration" },
    port." },
          { "name": "ReplicaIdSecurityProtocol", "type": "int32int16", "versions": "0+", "default "1+",
            "about": "-1", "entityType": "brokerId",The security protocol." }
        ]}
      ]}
    ]}
  ]
}

Handling

The handling of the Vote request will from that described in KIP-595 in that all replicas are allowed to vote event if they are not voters for the quorum. The voter will persist in the quorum state both the candidate ID and UUID so that it only votes for at most one candidate for a given epoch.

The replica will return the following errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match the local ID and UUID.

BeginQuorumEpoch

The version of the response is increase and the fields remain unchanged.

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the top level
  3. VoterUuId was added to PartitionData
  4. Allow tagged fields for version greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 53,
  "type": "request",
  "listeners": ["controller"],
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
      "about": "The replica ID of the follower" },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
      "about": "The maximum bytes to fetch from all of the snapshots" },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch" },
      { "name": "PartitionsClusterId", "type": "[]PartitionSnapshotstring", "versions": "0+",
        "about": "The partitions to fetchnullableVersions": "0+", "fieldsdefault": [
    "null"},
    { "name": "PartitionLeaderId", "type": "int32", "versions": "01+",
 "entityType": "brokerId",
        "about": "The partitionID index" },
    of the newly elected leader"}, 
    { "name": "ReplicaUuidVoterId", "type": "uuidint32", "versions": "1+", "defaultentityType": "nullbrokerId",
           "about": "The replicavoter UUIDID of the followerreceiving replica." }, 
        { "name": "CurrentLeaderEpochTopics", "type": "int32[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "aboutversions": "0+", "entityType": "ThetopicName",
 current leader epoch of the partition, -1 for unknown leader epoch "about": "The topic name." },
        { "name": "SnapshotIdPartitions", "type": "SnapshotId[]PartitionData", "versions": "0+",
          "about": "The snapshot endOffset and epoch to fetch", "fields": [
          { "name": "EndOffsetPartitionIndex", "type": "int64int32", "versions": "0+",
          "about": "The partition index." },
          { "name": "EpochVoterUuid", "type": "int32uuid", "versions": "01+" }
        ],
          "about": "The replica UUID of the receiving replica." },
        { "name": "PositionLeaderId", "type": "int64int32", "versions": "0+", "entityType": "brokerId",
          "about": "The byteID positionof within the snapshotnewly to start fetching from" }elected leader"},
      ]}
  {  ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

Vote

The Vote response version is bump and the fields remain unchanged.

Request

Changes:

...

"name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the newly elected leader"}
      ]}
    ]}
  ]
}

Response

Version 1 is a flexible version and add the tagged field LeaderEndpoint to PartitionData.

...

Code Block
languagejs
{
  "apiKey": 5253,
  "type": "request",
  "listeners": ["controller"]response",
  "name": "VoteRequestBeginQuorumEpochResponse",
  "validVersions": "0-1",
  "flexibleVersions": "01+",
  "fields": [
    { "name": "ClusterIdErrorCode", "type": "stringint16", "versions": "0+",
      "nullableVersionsabout": "0+", "default": "null" The top level error code."},
    { "name": "CandidateIdTopics", "type": "int32[]TopicData",
      "versions": "10+", "entityTypefields": "brokerId",
      "about": "The ID of the voter sending the request" },
    [
      { "name": "VoterIdTopicName", "type": "int32string", "versions": "10+", "entityType": "brokerIdtopicName",
        "about": "The ID of the replica receiving the request to votetopic name." },
      { "name": "TopicsPartitions", "type": "[]TopicDataPartitionData",
        "versions": "0+", "fields": [
        { "name": "TopicNamePartitionIndex", "type": "stringint32", "versions": "0+",
 "entityType": "topicName",
        "about": "The topicpartition nameindex." },
        { "name": "PartitionsErrorCode", "type": "[]PartitionDataint16",
        "versions": "0+"}, "fields": [
        { "name": "PartitionIndexLeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The partition index."  ID of the current leader or -1 if the leader is unknown."},
        { "name": "CandidateEpochLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The bumpedlatest epochknown of the candidate sending the requestleader epoch"},
           { "name": "CandidateIdLeaderEndPoint", "type": "int32Endpoint", "versions": "01+", "taggedVersions": "+1", "entityTypetag": "brokerId"0,
          "about": "The ID of the voter sending endpoint that can be used to communicate with the requestleader"}, "fields": [
          { "name": "CandidateUuidName", "type": "uuidstring", "versions": "1+", "mapKey": true,
            "about": "The candidatename generatedof UUID, null otherwisethe endpoint." },
          { "name": "VoterUuidHost", "type": "uuidstring", "versions": "1+",
 "nullableVersions": "1+", "default": "null" }
          "about": "The replica generated UUID of the replica receiving the request to vote, null otherwisehostname." },
  
        { "name": "LastOffsetEpochPort", "type": "int32uint16", "versions": "01+",
            "about": "The epoch of the last record written to the metadata log": "The port." },
          { "name": "LastOffsetSecurityProtocol", "type": "int64int16", "versions": "01+",
            "about": "The offset of the last record written to the metadata log"security protocol." }
        ]}
      ]}
    ]}
  ]
}

Handling

The handling of the Vote This request will from that be handle as described in KIP-595 in that all replicas are allowed to vote event if they are not voters for the quorum. The voter will persist in the quorum state both the candidate ID and UUID so that it only votes for at most one candidate for a given epoch.The replica will return the following with the following additional errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match the local ID and UUID.

BeginQuorumEpoch

The version of the response is increase and the fields remain unchanged.

  1. .
  2. NOT_VOTER - when the the replica is not a voter in the topic partition. This error should be retry by the leader of the topic partition.

EndQuorumEpoch

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the request
  3. VoterUuid was added to the top levelPartitions
  4. VoterUuId ReplicaUuid was added to PartitionDataPreferredSuccessors
  5. Allow tagged fields for version versions greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 54,
  "type": "request",
  "listeners": ["controller"],
  "name": "EndQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId",{
  "apiKey": 53,
  "type": "requestint32",
  "versions": "1+", "listenersentityType": ["controllerbrokerId"],
      "nameabout": "BeginQuorumEpochRequest"The current leader ID that is resigning." },
    { "validVersionsname": "0-1VoterId",
  "type": "int32", "flexibleVersionsversions": "1+",
  "fieldsentityType": ["brokerId",
    {  "nameabout": "ClusterId", "typeThe voter ID of the receiving replica." }, 
    { "name": "stringTopics", "versionstype": "0+[]TopicData",
      "nullableVersionsversions": "0+", "defaultfields": "null"},
[
      { "name": "LeaderIdTopicName", "type": "int32string", "versions": "10+", "entityType": "brokerIdtopicName",
        "about": "The ID of the newly elected leader"}, 
topic name." },
      { "name": "VoterIdPartitions", "type": "int32[]PartitionData", "versions": "10+", "entityTypefields": "brokerId",[
      "about": "The voter ID of the receiving replica." },
    { "name": "TopicsPartitionIndex", "type": "[]TopicDataint32", "versions": "0+",
 "fields": [         "about": "The partition index." },
        { "name": "TopicNameVoterUuid", "type": "stringuuid", "versions": "0+", "entityType": "topicName1+",
          "about": "The topic name replica UUID of the receiving replica." }, 
        { "name": "PartitionsLeaderId", "type": "[]PartitionDataint32", "versions": "0+", "entityType": "brokerId",
   "fields": [       "about": "The current leader ID that is resigning"},
        { "name": "PartitionIndexLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The partitioncurrent index.epoch" },
        { "name": "VoterUuidPreferredSuccessors", "type": "uuid[]ReplicaInfo", "versions": "10+",
          "about": "TheA replicasorted UUIDlist of preferred successors to start the receiving replica." },
election", "fields": [
          { "name": "LeaderIdReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the newly elected leader"" },
          { "name": "LeaderEpochReplicaUuid", "type": "int32uuid", "versions": "01+", }
          "about": "The epoch of the newly elected leader"}]} 
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match.
  2. NOT_VOTER - when the the replica is not a voter in the topic partition. This error should be retry by the leader of the topic partition.

EndQuorumEpoch

The version of the response is increase and the fields remain unchanged.

Request

...

]
}

Response

Version 1 is a flexible version and add the tagged field LeaderEndpoint to PartitionData

...

.

Code Block
languagejs
{
  "apiKey": 54,
  "type": "response",
  "name": "EndQuorumEpochResponse",
  "validVersions": "0-1",
 "request "flexibleVersions": "1+",
  "listenersfields": ["controller"],
  
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "EndQuorumEpochRequestTopics",
  "validVersionstype": "0-1[]TopicData",
      "flexibleVersionsversions": "10+",
  "fields": [
      { "name": "ClusterIdTopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "nullableVersionsabout": "0+", "default": "null"The topic name." },
      { "name": "LeaderIdPartitions", "type": "int32[]PartitionData",
        "versions": "10+", "entityTypefields": "brokerId",
      "about": "The current leader ID that is resigning." },
    [
        { "name": "VoterIdPartitionIndex", "type": "int32", "versions": "10+", "entityType": "brokerId",

          "about": "The voterpartition ID of the receiving replica." }, 
index." },
        { "name": "TopicsErrorCode", "type": "[]TopicDataint16", "versions": "0+"},
 "fields": [
      { "name": "TopicNameLeaderId", "type": "stringint32", "versions": "0+", "entityType": "topicNamebrokerId",
          "about": "The topicID name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [of the current leader or -1 if the leader is unknown."},
        { "name": "PartitionIndexLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The partition index." latest known leader epoch"},
           { "name": "VoterUuidLeaderEndPoint", "type": "uuidEndpoint", "versions": "1+", "taggedVersions": "+1", "tag": 0,
          "about": "The replica UUID of endpoint that can be used to communicate with the receiving replica.leader" }, 
, "fields": [
          { "name": "LeaderIdName", "type": "int32string", "versions": "01+", "entityTypemapKey": "brokerId"true,
            "about": "The currentname leaderof ID that is resigning"the endpoint." },
          { "name": "LeaderEpochHost", "type": "int32string", "versions": "01+",
            "about": "The currenthostname." epoch"},
          { "name": "PreferredSuccessorsPort", "type": "[]ReplicaInfouint16", "versions": "01+",
            "about": "AThe sorted list of preferred successors to start the election", "fields": [port." },
          { "name": "ReplicaIdSecurityProtocol", "type": "int32int16", "versions": "01+", "entityType": "brokerId" },
          { "name": "ReplicaUuid", "type"about": "uuid", "versions": "1+The security protocol." }
        ]} 
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

...

The version of the request is increase and the fields remain unchanged.

Response

  1. Add ReplicaUuid to ReplicaState

...