Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP describes a protocol for extending KIP-595 and KIP-630 so that the Kafka cluster can identify that there was a disk failure in the cluster metadata topic partition and for the operator can programmatically replace the failed voter in a way that is safe and available.

...

This KIP is inspired by Chapter 4 of Consensus: Bridging Theory and Practice [2]. The description of this KIP makes the assumption that the reader is familiar with the references enumerated at the bottom of this page.

...

Log Directory Storage UUID

In addition to a replica ID each replica assigned to a KRaft topic partition Each log directory of a Kafka node will have a replica storage UUID. This UUID will be generated once and persisted as part of the log directory's meta.properties. A replica (controller and broker) may host multiple topic partitions in multiple log directories. The replica UUID will be generated per log directory by the replica.

There are two cases when a replica UUID will be generategenerated:

  1. The log directory is new and it doesn't contain the meta.properties file persisted on-disk.
  2. The meta.properties in the log directory doesn't contain a replica UUID. The meta.properties file will be atomically replace with a new meta.properties file that contains a generated replica UUID.

Replicas in a KRaft topic partition that are voter or can become voters will be identified by their ID and UUID.

In this KIP the storage UUID is also called ReplicaUuid, CandidateUuid, VoterUuid and VotedUuid depending on the context.

Voter Changes

Adding Voters

...

The replica id and endpoint for all of the voters is describe and persistent persisted in the configuration controller.quorum.voters. An example value for this configuration is 1@localhost:9092,2@localhost:9093,3@localhost:9094 .

Leader of the KRaft topic partition will not allow the AddVoter RPCs to add replica IDs that are not describe in the configuration and it would not allow the RemoveVoter RPCs to completely remove replica IDs that are described in the configuration. This means that the replica UUID can change by using sequence of AddVoter and RemoveVoter but the client cannot add new replica id or remove a replica id from the set of voters.

When the leader of the topic partition first discovers the UUID of all the voters through the KRaft RPCs, for example Fetch and Vote, it will write a an AddVoterRecord to the cluster metadata log for each of the voters voter described in the configurationcontroller.quorum.voters. All of these AddVoterRecord must be contained in one control record batch.

...

If the cluster metadata topic partition contains a AddVoterRecords for a replica ids id that do is not enumerated in controller.quorum.voters then the replica will fail to start and shutdown.

...

It is possible for the leader to write an AddVoterRecord to the log and replica it replicateit to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request vote requests when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.

...

As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermark to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remove remote replicas.

With this KIP it is possible for the leader to not be part of the voter set when the replica removed is the leader. In this case the leader will continue to handle Fetch and FetchSnapshot request requests as normal but it will not count itself when computing the high watermark.

...

To better illustrate this feature this section describes how the Kafka will detect a disk failure and how the administrator can use the features in this KIP to recover from the disk failure.

Let's assume that the cluster is configured to use  controller.quorum.voters and the value is 1@host1:9092,2@host2:9092,3@host3:9094.  The voter set is defined by the tuples (1, UUID1), (2, UUID2) and (3, UUID3). The first element of the tuples is the replica id. The second element of the tuples is the replica storage uuid. The replica storage uuid is automatically generated once by the replica node when persisting the meta.properties for the a log directory.

At this point the operator replaces the a failed disk for replica 3. This means that when replica 3 starts it will generate a new replica storage uuid (UUID3') and meta.properties. Replica 3 will discover the partition leader using controller.quorum.voters and send sends UUID3' in the Fetch and FetchSnapshot requestrequests. The leader state for voters will be (1, UUID1), (2, UUID2) and (3, UUID3) with the observers being (3, UUID3').

This state can be discovered by a client by using the DescribeQuorum RPC, the Admin client or the kafka-metadata-quorum CLI. The client can now decide to add replica (3, UUID3') to the set of voters using the AddVoter RPC, the Admin client or the kafka-metadata-quorum CLI..

When the AddVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), (3, UUID3) and (3, UUID3'). The client can now remove the failed disk from the voter set by using the RemoveVoter RPC, the Admin client or the kafka-metadata-quorum CLI.

When the RemoveVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), and (3, UUID3'). At this point the Kafka cluster has successfully recover from a disk failure in a controller node.

...

A control record for instructing the voters replicas to add a new voter to the topic partition. This record can exist in both the log and the snapshot of a topic partition.

...

Each KRaft topic partition has a quorum state (QuorumStateData) that gets persisted in the qurorum-state.json file in the directory for the topic partition.

A new field called ReplicaUuid  VotedUuid  will get added to QuorumStateData . Each KRaft replica will store a locally generated UUID in this field. The replica will generate and persist an UUID when it reads a version 0 of the QuorumStateData  or if the QuorumStateData  hasn't been persistent in the pastThis is the UUID for the replica for which the local replica voted in the LeaderEpoch.

Remove the CurrentVoters field from QuorumStateData . This information will instead be being persisted in the log and snapshot using the AddVoterRecord and RemoveVoterRecord.

Code Block
languagejs
{
  "type": "data",
  "name": "QuorumStateData",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+"},
    { "name": "ReplicaUuidLeaderId", "type": "uuidint32", "versions": "10+",
      "aboutdefault": "The locally generated UUID for this replica."-1"},
    { "name": "LeaderIdLeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
    { "name": "LeaderEpochVotedId", "type": "int32", "versions": "0+", "default": "-1"},
    { "name": "VotedIdVotedUuid", "type": "int32uuid", "versions": "1+", "nullableVersions": "01+", "default": "-1null" },
    { "name": "VotedUuidAppliedOffset", "type": "uuidint64", "versions": "1+", "nullableVersions": "1+", "default": "null" },
    { "name": "AppliedOffset", "type": "int64", "versions": "0+"},
    { "name": "CurrentVoters", "type": "[]Voter", "versions": "0", "nullableVersions": "0"}
  ],
  "commonStructs": [
    { "name": "Voter", "versions": "0", "fields": [
      {"name": "VoterId", "type": "int32", "versions": "0"}
    ]}
  ]
}

RPCs

AddVoter

Request

meta.properties

A new property called storage.id will get added to the meta.properties of a log directory. If the meta.properties file doesn't exist a new one will be generated with this storage id. If the meta.properties file exist but it doesn't contain this property a new one will be generated and the meta.properties files will be atomically replaced.

RPCs

AddVoter

Request

This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "AddVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterId", "type": "int32", "versions": "0+",
          "about": "The ID of the voter getting added to the topic partition." },
        { "name": "VoterUuid", "type": "uuid", "versions": "0+",
          "about": "The replica generated UUID of the replica getting added as a voter to the topic partition." }
      ]},
     ]}
    { "name ]
}

Response

Code Block
languagejs
{
  "apiKey": "EndPointTBD",
  "type": "Endpointresponse",
  "versionsname": "0+AddVoterResponse",
  "validVersions": "0",
       "aboutflexibleVersions": "The endpoint that can be used to communicate with the leader0+",
  "fields": [
          { "name": "NameErrorCode", "type": "stringint16", "versions": "0+", "mapKey": true,
            "about": "The nametop oflevel theerror endpointcode." },
          { "name": "HostTopics", "type": "string[]TopicData", "versions": "0+",
            "aboutfields": "The hostname." },
  [
        { "name": "PortTopicName", "type": "uint16string", "versions": "0+",
   "entityType": "topicName",
         "about": "The portname of the topic." },
          { "name": "SecurityProtocolPartitions", "type": "int16[]PartitionData", "versions": "0+",
 "fields": [
        { "name": "Index", "abouttype": "The security protocol." }int32", "versions": "0+",
        ]}
  "about": "The partition index." ]},
    ]}
    ]
}

Response

Code Block
languagejs
{
  "apiKeyname": "TBDErrorCode",
  "type": "responseint16",
  "nameversions": "AddVoterResponse0+",
   "validVersions       "about": "0",
  "flexibleVersions": "0+",
  "fields": [
The error code, or 0 if there was no fetch error." },
        { "name": "ErrorCodeCurrentLeader", "type": "int16LeaderIdAndEpoch", "versions": "0+",
      "abouttaggedVersions": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
0+", "tag": 0, "fields": [
          { "name": "TopicNameLeaderId", "type": "stringint32", "versions": "0+", "default": "-1", "entityType" : "topicNamebrokerId",
            "about": "The nameID of the current leader or -1 if the topicleader is unknown." },
          { "name": "PartitionsLeaderEpoch", "type": "[]PartitionDataint32", "versions": "0+", "fieldsdefault": ["-1",
         {   "nameabout": "Index", "type": "int32", "versions": "0+",
    The latest known leader epoch." }
        ]}
      "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch." },
          { "name": "EndPoint", "type": "Endpoint", "versions": "0+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Host", "type": "string", "versions": "0+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "0+",
              "about": "The port." }
          ]}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait for until there are no uncommitted add or remove voter records.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  4. Append the AddVoterRecord to the log.
  5. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  6. Wait for the AddVoterRecord to commit using the majority of new configuration.
  7. Send the AddVoter response to the client.

In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log the set of voter changes. If the added voter is far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will fail this RPC until it discovers that it is in the voter set. The leader will continue to retry until the RPC succeeds.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replic ID and UUID that is already in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not an observer.
  4. INVALID_REQUEST - when the request contains endpoints but the quorum was configured to use controller.quorum.voters.
  5. INVALID_REQUEST - when the request contains a replica ID that is not enumerated in the controller.quorum.voters configuration.

The broker will handle this request by forwarding the request to the active controller.

RemoveVoter

Request

This RPC can be sent by an administrative client to remove a voter from the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterId", "type": "int32", "versions": "0+",
          "about": "The ID of the voter getting removed from the topic partition." },
        { "name": "VoterUuid", "type": "uuid", "versions": "0+",
          "about": "The replica generated UUID of the replica getting removed as a voter from the topic partition." },
      ]}
    ]}
  ]
}

Response

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch." },
          { "name": "EndPoint", "type": "Endpoint", "versions": "0+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Host", "type": "string", "versions": "0+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "0+",
              "about": "The port." }
          ]}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a RemoveVoter request it will do the following:

  1. Wait for until there are no uncommitted add or remove voter records.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  3. Append the RemoveVoterRecord to the log.
  4. The KRaft internal listener will read this record from the log and remove the voter from the voter set.
  5. Wait for the RemoveVoterRecord to commit using the majority of new configuration.
  6. Send the RemoveVoter response to the client.
  7. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

In 3. and 4. it is possible for the RemoveVoterRecord would remove the current leader from the voter set. In this case the leader needs to allow Fetch and FetchSnapshot requests from replicas. The leader should not count itself when determining the majority and determining if records have been committed.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replic ID and UUID that is already not in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not a voter.
  4. INVALID_REQUEST - when the controller.quorum.voters is used and the request contains a replica ID that would cause the replica ID to get removed from the voter set. In other words RemoveVoter can be used to remove replica UUIDs not replica IDs when the controller.quorum.voters configuration is used.

The broker will handle this request by forwarding the request to the active controller.

Fetch

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica ID of the follower, of -1 if this request is from a consumer." },
    ...
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true,
        "about": "The name of the topic to fetch." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true,
        "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "14+", "nullableVersions": "14+", "default": "null",
          "about": "The replica generated UUID. null otherwise." },
        ...
      ]}
    ]}
  ]
}

Response

Version 14 rename LeaderIdAndEpoch to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    ...
    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "ignorable": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        ...
        { "name": "CurrentLeader", "type": "CurrentLeader",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [
           { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"},
          { "name": "EndPoint", "type": "Endpoint", "versions": "14+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Host", "type": "string", "versions": "14+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "14+",
              "about": "The port." }
          ]}
        ]},
        ...
      ]}
    ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

]}
    ]}
  ]
}

Handling

When the leader receives a AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait for until there are no uncommitted add or remove voter records.
  3. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  4. Append the AddVoterRecord to the log.
  5. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  6. Wait for the AddVoterRecord to commit using the majority of new configuration.
  7. Send the AddVoter response to the client.

In 1., the leader needs to wait for the replica to catch up because when the AddVoterRecord is appended to the log the set of voter changes. If the added voter is far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 3., the leader needs to wait for LeaderChangeMessage of the current leader epoch to get committed to address this known issue.

In 5., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will fail this RPC until it discovers that it is in the voter set. The leader will continue to retry until the RPC succeeds.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_ADDED - when the request contains a replic ID and UUID that is already in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not an observer.
  4. INVALID_REQUEST - when the request contains a replica ID that is not enumerated in the controller.quorum.voters configuration.

The broker will handle this request by forwarding the request to the active controller.

RemoveVoter

Request

This RPC can be sent by an administrative client to remove a voter from the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]TopicData
Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
      "about": "The clusterId if known, this is used to validate metadata fetches prior to broker
   registration" },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The replica ID of the follower" },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
      "about": "The maximum bytes to fetch from all of the snapshots" },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch" },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about"fields": "The partition index" },[
           { "name": "ReplicaUuidTopicName", "type": "uuidstring", "versions": "10+", "defaultentityType": "nulltopicName",
           "about": "The replicaname UUID of the followertopic." }, 
        { "name": "CurrentLeaderEpochPartitions", "type": "int32[]PartitionData", "versions": "0+",
          "aboutfields": "The current leader epoch of the partition, -1 for unknown leader epoch" },[
        { "name": "SnapshotIdIndex", "type": "SnapshotIdint32", "versions": "0+",
          "about": "The snapshot endOffset and epoch to fetch", "fields": [
  "The partition index." },
        { "name": "EndOffsetVoterId", "type": "int64int32", "versions": "0+" },
          { "nameabout": "Epoch", "type": "int32", "versions": "0+" }
        ]The ID of the voter getting removed from the topic partition." },
        { "name": "PositionVoterUuid", "type": "int64uuid", "versions": "0+",
          "about": "The replica bytegenerated positionUUID withinof the snapshot to start fetching fromreplica getting removed as a voter from the topic partition." },
      ]}
    ]}
  ]
}

Response

Version 1 renames LeaderIdAndEpoc to CurrentLeader and adds Endpoint to CurrentLeader.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "response",
  "name": "FetchSnapshotResponse",
  "validVersionsapiKey": "0-1TBD",
  "flexibleVersionstype": "0+response",
  "fields": [
    { "name": "ThrottleTimeMsRemoveVoterResponse",
  "typevalidVersions": "int320",
  "versionsflexibleVersions": "0+", "ignorable": true,
      "aboutfields": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },[
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshotTopicData", "versions": "0+",
      "about": "The topics to fetch.0+", "fields": [
      { "name": "NameTopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshotPartitionData", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch fetched",
          "fields": [
 : "The error code, or 0 if there was no fetch error." },
         { "name": "EndOffsetCurrentLeader", "type": "int64LeaderIdAndEpoch", "versions": "0+" },, "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "EpochLeaderId", "type": "int32", "versions": "0+" }
        ]},
        { "namedefault": "CurrentLeader-1", "typeentityType" : "CurrentLeaderbrokerId",
            "versionsabout": "0+", "taggedVersions": "0+", "tag": 0, "fields": [The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderIdLeaderEpoch", "type": "int32", "versions": "0+", "entityTypedefault": "brokerId-1",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"},
          { "name": "EndPoint", "type": "Endpoint", "versions": "1+",
            "about": "The endpoint that can be used to communicate with the leader", "fields": [
            { "name": "Host", "type": "string", "versions": "1+",
              "about": "The hostname." },
            { "name": "Port", "type": "uint16", "versions": "1+",
              "about": "The port." }
          ]},
        { "name": "Size", "type": "int64", "versions": "0+",
          "about": "The total size of the snapshot." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The starting byte position within the snapshot included in the Bytes field."   },
        about": "The latest known leader epoch." }
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a RemoveVoter request it will do the following:

  1. Wait for until there are no uncommitted add or remove voter records.
  2. Wait for the LeaderChangeMessage control record from the current epoch to get committed.
  3. Append the RemoveVoterRecord to the log.
  4. The KRaft internal listener will read this record from the log and remove the voter from the voter set.
  5. Wait for the RemoveVoterRecord to commit using the majority of new configuration.
  6. Send the RemoveVoter response to the client.
  7. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader.

In 3. and 5. it is possible for the RemoveVoterRecord would remove the current leader from the voter set. In this case the leader needs to allow Fetch and FetchSnapshot requests from replicas. The leader should not count itself when determining the majority and determining if records have been committed.

The replica will return the following errors:

  1. NOT_LEADER_FOR_PARTITION - when the request is sent to a replica that is not the leader.
  2. VOTER_ALREADY_REMOVED - when the request contains a replic ID and UUID that is already not in the committed voter set.
  3. INVALID_REQUEST - when the request contains a replica ID and UUID that is not a voter.
  4. INVALID_REQUEST - when the request contains a replica ID that would cause the replica ID to get removed from the voter set. In other words RemoveVoter can be used to remove replica UUIDs not replica IDs.

The broker will handle this request by forwarding the request to the active controller.

Fetch

The response versions will include version 14 and the fields will remain the same.

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "UnalignedRecordsClusterId", "type": "recordsstring", "versions": "012+",
 "nullableVersions":         "about"12+", "default": "Snapshot data in records format which may not be aligned on an offset boundary" }
      ]}
    ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

Vote

Request

Changes:

  1. Candidate Id was moved out of the topic partition maps
  2. Candidate Uuid was added to the PartitionData
  3. VoterId was added to the top level
  4. VoterUuId was added to PartitionData
Code Block
languagejs
{
  "apiKey": 52,
  "type": "request",
  "listeners": ["controller"],
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    null", "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica ID of the follower, of -1 if this request is from a consumer." },
    ...
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true,
        "about": "The name of the topic to fetch." },
      { "name": "ClusterIdTopicId", "type": "stringuuid", "versions": "013+", "nullableVersionsignorable": "0+"true,
 "default       "about": "null" The unique topic ID"},
      { "name": "CandidateId", "type": "int32Partitions", "versionstype": "1+[]FetchPartition", "entityTypeversions": "brokerId0+",
        "about": "The partitions ID of the voter sending the request" },
to fetch.", "fields": [
        { "name": "VoterIdPartition", "type": "int32", "versions": "10+", "entityType": "brokerId",

          "about": "The ID of the replica receiving the request to votepartition index." },
        { "name": "TopicsReplicaUuid", "type": "[]TopicDatauuid", "versions": "014+", "nullableVersions": "14+", "fields": [
default": "null",
          "about": "The replica generated UUID. null otherwise." },
        ...
      ]}
    ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-595. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

The response versions will include version 1 and the fields will remain the same.

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
          "about": "The bumped epoch of the candidate sending the request"},
        { "name": "CandidateId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the voter sending the request"},
        { "name": "CandidateUuidClusterId", "type": "uuidstring", "versions": "10+", "nullableVersions": "0+",
     "default": "null", "taggedVersions": "0+", "tag": 0,
      "about": "The candidateclusterId generatedif UUID, null otherwise" },known, this is used to validate metadata fetches prior to broker
   registration" },
    { "name": "VoterUuidReplicaId", "type": "uuidint32", "versions": "10+", "nullableVersionsdefault": "-1+", "defaultentityType": "nullbrokerId" },
          "about": "The replica generatedID UUID of the replica receiving the request to vote, null otherwisefollower" }, 
        { "name": "LastOffsetEpochMaxBytes", "type": "int32", "versions": "0+",
 "default": "0x7fffffff",
        "about": "The epochmaximum ofbytes theto lastfetch recordfrom writtenall toof the metadatasnapshots" log"},
        { "name": "LastOffsetTopics", "type": "int64[]TopicSnapshot", "versions": "0+",
          "about": "The offset of the last record written to the metadata log"}
      ]}topics to fetch", "fields": [
    ]}
  ]
}

Response

Version 1 addes the tagged field LeaderEndpoint to PartitionData.

Code Block
languagejs
{
  "apiKeyname": 52"Name",
  "type": "responsestring",
  "nameversions": "VoteResponse0+",
  "validVersionsentityType": "0-1",
topicName",
        "flexibleVersionsabout": "0+"The name of the topic to fetch" },
  "fields": [
    { "name": "ErrorCodePartitions", "type": "int16[]PartitionSnapshot", "versions": "0+",
        "about": "The toppartitions levelto error code."},fetch", "fields": [
        { "name": "TopicsPartition", "type": "[]TopicDataint32",
      "versions": "0+",
          "fieldsabout": [
"The partition index" },
        { "name": "TopicNameReplicaUuid", "type": "stringuuid", "versions": "01+", "entityTypedefault": "topicNamenull",
           "about": "The topic name.replica UUID of the follower" }, 
        { "name": "PartitionsCurrentLeaderEpoch", "type": "[]PartitionDataint32", "versions": "0+",
          "versionsabout": "0+", "fields": [The current leader epoch of the partition, -1 for unknown leader epoch" },
        { "name": "PartitionIndexSnapshotId", "type": "int32SnapshotId", "versions": "0+",
          "about": "The partition index." },
 snapshot endOffset and epoch to fetch", "fields": [
          { "name": "ErrorCodeEndOffset", "type": "int16int64", "versions": "0+" },
          { "name": "LeaderIdEpoch", "type": "int32", "versions": "0+", "entityType": "brokerId", }
          "about": "The ID of the current leader or -1 if the leader is unknown."]},
        { "name": "LeaderEpochPosition", "type": "int32int64", "versions": "0+",
          "about": "The latest known leader epoch"},: "The byte position within the snapshot to start fetching from" }
      ]}
    ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

There are a few changes to the leader request handling described in KIP-630. The leaders will track the fetch offset for the replica tuple (ID, UUID). This means that replica are unique identified by their ID and UUID. So their state will be tracking using the ID and UUID.

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

Vote

Request

Changes:

  1. Candidate Id was moved out of the topic partition maps
  2. Candidate Uuid was added to the PartitionData
  3. VoterId was added to the top level
  4. VoterUuId was added to PartitionData
Code Block
languagejs
{
  "apiKey": 52,
 { "name": "VoteGranted", "type": "bool", "versions": "0+",
          "about": "True if the vote was granted and false otherwise"},
        { "name": "VoterUuid", "type": "boolrequest", "versions": "1+",
          "aboutlisteners": "The replica generated uuid for the replica casting a vote." }["controller"],
        { "name": "LeaderEndPoint", "type": "Endpoint", "versions": "1+", "taggedVersions": "+1", "tag": 0,
        VoteRequest",
  "aboutvalidVersions": "The endpoint that can be used to communicate with the leader", 0-1",
  "flexibleVersions": "0+",
  "fields": [
          { "name": "HostClusterId", "type": "string", "versions": "10+",
 "nullableVersions":           "about"0+", "default": "The hostname.null" },
          { "name": "PortCandidateId", "type": "uint16int32", "versions": "1+",
 "entityType": "brokerId",
          "about": "The port.ID of the voter sending the request" },
    { "name": "VoterId",  ]}
      ]}"type": "int32", "versions": "1+", "entityType": "brokerId",
    ]}
  ]
}

Handling

The handling of the Vote request will from that described in KIP-595 in that all replicas are allowed to vote event if they are not voters for the quorum. The voter will persist in the quorum state both the candidate ID and UUID so that it only votes for at most one candidate for a given epoch.

The replica will return the following errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match the local ID and UUID.

BeginQuorumEpoch

The version of the response is increase and the fields remain unchanged.

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the top level
  3. VoterUuId was added to PartitionData
  4. Allow tagged fields for version greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 53,
   "about": "The ID of the replica receiving the request to vote." },
    { "name": "Topics", "type": "request[]TopicData",
  "listenersversions": ["controller0+"], "fields": [
      { "name": "BeginQuorumEpochRequestTopicName",
 "type": "validVersions"string", "versions": "0-1+",
  "flexibleVersionsentityType": "1+topicName",
        "fieldsabout": [
 "The topic name." },
      { "name": "ClusterIdPartitions", "type": "string[]PartitionData",
 "versions": "0+",
      "nullableVersionsversions": "0+", "defaultfields": "null"},
[
        { "name": "LeaderIdPartitionIndex", "type": "int32", "versions": "10+", "entityType": "brokerId",

          "about": "The IDpartition of the newly elected leader"}, 
index." },
        { "name": "VoterIdCandidateEpoch", "type": "int32", "versions": "10+", "entityType": "brokerId",

          "about": "The voterbumped IDepoch of the receivingcandidate replica." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
sending the request"},
        { "name": "TopicNameCandidateId", "type": "stringint32", "versions": "0+", "entityType": "topicNamebrokerId",
          "about": "The topicID name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [of the voter sending the request"},
        { "name": "PartitionIndexCandidateUuid", "type": "int32uuid", "versions": "01+",
          "about": "The partition index. candidate generated UUID, null otherwise" },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+", "nullableVersions": "1+", "default": "null" }
          "about": "The replica generated UUID of the replica receiving replica the request to vote, null if not known by the candidate." }, 
        { "name": "LeaderIdLastOffsetEpoch", "type": "int32", "versions": "0", "entityType": "brokerId"+",
          "about": "The IDepoch of the newly elected leader last record written to the metadata log"},
        { "name": "LeaderEpochLastOffset", "type": "int32int64", "versions": "0+",
          "about": "The epochoffset of the newly elected leaderlast record written to the metadata log"}
      ]}
    ]}
  ]
}

Response

Version 1 is a flexible version and add the tagged field LeaderEndpoint adds the field VoterUuid  to PartitionData.

Code Block
languagejs
{
  "apiKey": 5352,
  "type": "response",
  "name": "BeginQuorumEpochResponseVoteResponse",
  "validVersions": "0-1",
  "flexibleVersions": "10+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch leader is unknown."},
           { "name": "LeaderEndPointLeaderEpoch", "type": "Endpointint32", "versions": "10+", "taggedVersions": "+1", "tag": 0,
          "about": "The endpointlatest thatknown can be used to communicate with the leader", "fields": [
  leader epoch"},
        { "name": "HostVoteGranted", "type": "stringbool", "versions": "10+",
            "about": "The hostname." True if the vote was granted and false otherwise"},
          { "name": "PortVoterUuid", "type": "uint16bool", "versions": "1+",
            "about": "The port." }
        ]replica generated uuid for the replica casting a vote." }
      ]}
    ]}
  ]
}

Handling

...

The candidate replica will only send vote requests to replicas it considers voters. The handling of the Vote request will change from that This request will be handle as described in KIP-595 in that all replicas are allowed to cast a vote even if they are not voters for the quorum. The voter will persist in the quorum state both the candidate ID and UUID so that it only votes for at most one candidate for a given epoch.

The replica will return the following with the following additional errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match .
  2. NOT_VOTER - when the the replica is not a voter in the topic partition. This error should be retry by the leader of the topic partition.

...

  1. the local ID and UUID. Note that the UUID is optional will only be checked if provided.

BeginQuorumEpoch

The version of the response is increase and the fields remain unchanged.

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the requesttop levelVoterUuid
  3. VoterUuId was added to the Partitions
  4. ReplicaUuid was added to PreferredSuccessors
  5. PartitionData
  6. Allow tagged fields for versions version greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 5453,
  "type": "request",
  "listeners": ["controller"],
  "name": "EndQuorumEpochRequestBeginQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The currentID leaderof IDthe thatnewly iselected resigning.leader" }, 
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The voter ID of the receiving replica." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
  
    { "name": "TopicsPartitions", "type": "[]TopicDataPartitionData", "versions": "0+", "fields": [
        { "name": "TopicNamePartitionIndex", "type": "stringint32", "versions": "0+",
 "entityType": "topicName",
        "about": "The topicpartition nameindex." },         ,
        { "name": "PartitionsVoterUuid", "type": "[]PartitionDatauuid", "versions": "1+", "nullableVersions": "01+", "fieldsdefault": ["null" }
         { "nameabout": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition indexThe replica generated UUID of the replica receiving the request to vote, null if not known by the leader." },
        {    {"name": "LeaderId", "nametype": "VoterUuidint32", "typeversions": "uuid0", "versionsentityType": "1+brokerId",
          "about": "The replicaID UUID of the receiving replica." newly elected leader"}, 
        { "name": "LeaderIdLeaderEpoch", "type": "int32", "versions": "0+",
 "entityType         "about": "brokerId",
The epoch of the newly elected leader"}
      ]}
           "about": "The current leader ID that is resigning"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current epoch"},
        { "name": "PreferredSuccessors", "type": "[]ReplicaInfo", "versions": "0+",
          "about": "A sorted list of preferred successors to start the election", "fields": [
          { "name": "ReplicaId", "type": "int32",]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match. Note that the UUID is optional will only be checked if provided.
  2. NOT_VOTER - when the the replica is not a voter in the topic partition. This error should be retry by the leader of the topic partition.

EndQuorumEpoch

The version of the response is increase and the fields remain unchanged.

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the request
  3. VoterUuid was added to the Partitions
  4. ReplicaUuid was added to PreferredSuccessors
  5. Allow tagged fields for versions greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 54,
  "type": "request",
  "listeners": ["controller"],
  "name": "EndQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
     "versions": "0+", "entityType": "brokerId" },
          { "name": "ReplicaUuidClusterId", "type": "uuidstring", "versions": "10+" },
      "nullableVersions":  ]} 
      ]}"0+", "default": "null"},
    ]}
  ]
}

Response

Version 1 is a flexible version and add the tagged field LeaderEndpoint to PartitionData.

Code Block
languagejs
{
  "apiKeyname": 54"LeaderId",
  "type": "responseint32",
  "nameversions": "EndQuorumEpochResponse1+",
  "validVersionsentityType": "0-1brokerId",
      "flexibleVersionsabout": "1+",
  "fields": [The current leader ID that is resigning." },
    { "name": "VoterId", "type": "ErrorCodeint32", "typeversions": "int161+", "versionsentityType": "0+brokerId",
      "about": "The top level error code."},voter ID of the receiving replica." }, 
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },         ,
        { "name": "ErrorCodeVoterUuid", "type": "int16uuid", "versions": "0+"},
        1+", "nullableVersions": "1+", "default": "null" }
          "about": "The replica generated UUID of the replica receiving the request to vote, null if not known by the leader." }, 
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader orID -1 if the leader that is unknown.resigning"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest knowncurrent leader epoch"},
           { "name": "LeaderEndPointPreferredSuccessors", "type": "Endpoint[]ReplicaInfo", "versions": "1+", "taggedVersions": "+1", "tag": 0,0+",
          "about": "TheA endpointsorted thatlist canof bepreferred usedsuccessors to communicate withstart the leaderelection", "fields": [
          { "name": "HostReplicaId", "type": "stringint32", "versions": "10+",
            "about"entityType": "The hostname.brokerId" },
          { "name": "Port", "type": "uint16", "versions": "1+",
            "about": "The port.ReplicaUuid", "type": "uuid", "versions": "1+" }
        ]} 
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

...

The version of the request is increase and the fields remain unchanged.

Response

  1. Add ReplicaUuid to ReplicaState

...

A future KIP will describe how the kafka-metadata-shell tool will be extended to be able to read and display KRaft control records from the quorum,   snapshot and log.

kafka-storage.sh

A future KIP describe how the kafka-storage tool will be extended to be able to bootstrap the first quorum node by writing an AddVoterRecord to the cluster metadata log when the controller.quorum.bootstrap.servers configuration is used.

kafka-metadata-quorum.sh

...

This command is used to add new voters to the topic partition. The flags --replicaId and --replicaUuid must be specified. A future KIP will describe how the user can specify endpoint information for the replica.

--remove-voter

This command is used to remove voters from the topic partition. The flags --replicaId and --replicaUuid must be specified.

...

The features in this KIP will be supported if the ApiVersions of all of the voters and observers is greater than the versions described here. If the leader has a replica UUID for all of the voters then this KIP is supported by all of the voters.

Upgrading to controller. quorum.bootstrap.servers

TODO: figure out a way to do this. The requirement is the AddVoterRecord for all of the voters in the static configuration are committed. How do we guarantee this? Is seeing a committed AddVoterRecord enough?

Test Plan

This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ specification.

...

KIP-642: Dynamic quorum reassignment. KIP-642 describe how to perform dynamic reassignment will multiple voters added and removed. KIP-642 doesn't support disk failures and it would be more difficult to implement compared to this KIP-853856. In a future KIP we can describe how we can add administrative operations that support the addition and removal of multiple voters.

...