Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Describe the problems you are trying to solve.

Public Interfaces

Configuration

controller.quorum.voters

This is an existing configuration. If the cluster uses this configuration to configure the quorum, adding new replica ids will not be supported. The cluster will only support changing the GUID for an existing replica id.

controller.quorum.bootstrap.servers

This configuration can be used instead of controller.quorum.voters. This is a list of nodes that brokers and new controllers can use to discover the quorum leader. Brokers and new controllers (observers) will send FETCH request to all of the nodes in this configuration until they discover the quorum leader and the FETCH request succeeds. The quorum voters and their configuration will be learned by fetching and reading the log and snapshot.

Log and Snapshot Control Records

Two new control records will be added to the log and snapshot of a KRaft partition.

TODO: The LeaderChangeRecord needs to change

AddVoterRecord

A control record for instructing the voters to add a new voter to the topic partition. This record can exist in both the log and the snapshot of a topic partition.

The ControlRecordType  is TBD and will be updated when the code is commit to Kafka.

Proposed Changes

Replica UUID

In addition to a replica ID each replica assigned to a topic partition will have  replica UUID. This UUID will be generated once and persisted as part of the quorum state for the topic partition. Replicas in a quorum that can become voters will be identified by their ID and UUID.

Voter Changes

Adding Voters

Voters are added to the cluster metadata partition by send an AddVoter RPC to the leader. For safety Kafka will only allow one voter change operation at a time. If there are any pending voter change operations the leader will wait for them to finish. If there are no pending voter change operation the leader will write a AddVoterRecord to the log and immediately update its in-memory quorum state to include this voter as part of the quorum. Voters will no wait for these records to get committed before updating their voter set.

Once the AddVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC and process new voter change operations.

Removing Voters

Voter are removed from the cluster metadata partition by send an RemoveVoter RPC to the leader. This works similar to adding a voter in that the leader will append the RemoveVoterRecord to the log and immediately update their voter set to the new configuration.

Once the RemoveVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC and resign from the quorum if the removed replica is the leader. To allow this operation to commit the followers will continue to fetch from the leader even if the leader is not part of the new voter set.

Bootstrapping

The section describe how the quorum will get bootstrap to support this KIP. There are two configuration that Kafka will support and each will be explain desperately. The Kafka cluster can be configured to used the existing controller.quorum.voters  or the new property controller.quorum.bootstrap.servers.

controller.quorum.voters

This is a static quorum configuration where all of the voters' ID, host and port are specified. An example value of this configuration is 1@localhost:9092,2@localhost:9093,3@localhost:9094 .

When this option is used the leader of the KRaft quorum will not allow the AddVoter RPC to add replica IDs that are not describe in the configuration and it would not allow the RemoveVoter RPC to remove replica IDs that are described in the configuration.

When the leader of the topic partition first discovers the UUID of the voters through the Fetch and FetchSnapshot RPC it will write a AddVoterRecord to the log for each of the voters described in the configuration.

The leader perform writes from the state machine until is has written to the log a AddVoterRecord for every replica id in the controller.quorum.voters  configuration.

controller.quorum.bootstrap.servers

This configuration describe the set of host and ports that can be queried to discover the cluster metadata partition leader. Observers and to be added voters will send Fetch requests to this list of server until the leader is discovered.

When using this configuration the quorum will be started with only one voter. This voter is bootstrapped by running the kafka-storage.sh tool. This tool will create the cluster metadata partition and append one AddVoterRecord to it. Additional voters can get added by using the AddVoter RPC as describe in this document.

Leader Election

It is possible for the leader to write an AddVoterRecord to the log and replica it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set.

Snapshots

The snapshot generation code needs to be extended to include this new KRaft specific control recoreds for AddVoterRecord and RemoveVoterRecord. Before this KIP the snapshot didn't include any KRaft generated control records.

Internal Listener

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

Common Scenarios

To better illustrate this feature this section describes two common scenarios that the Kafka administrator may perform.

Disk Failure Recovery

If one of the replicas encounter a disk failure the operation can replace this disk with a new disk and start the replica.

Let's assume that the cluster is configured to use  controller.quorum.voters and the value is 1@host1:9092,2@host2:9092,3@host3:9094.  The voter set is defined by the tuples (1, UUID1), (2, UUID2) and (3, UUID3). The first element of the tuples is the replica id. The second element of the tuples is the replica uuid. The replica uuid is automatically generated once by the replica when persisting the quorum state for the first time.

At this point the disk for replica 3 is replaced. This means that when replica 3 starts it will generate a new replica uuid (UUID3') and quorum state. Replica 3 will discover the partition leader using controller.quorum.voters and send UUID3' in the Fetch and FetchSnapshot request. The leader state for voters will be (1, UUID1), (2, UUID2) and (3, UUID3) and for observers will be (3, UUID3').

This state can be discovered by a client by using the DescribeQuorum RPC, the Admin client or the kafka-quorum.sh CLI. The client can now decide to add replica (3, UUID3') to the set of voters using the AddVoter RPC, the Admin client or the kafka-quorum.sh CLI.

When the AddVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), (3, UUID3) and (3, UUID3'). The client can remove the failed disk from the voter set by using the RemoveVoter RPC, the Admin client or the kafka-quorum.sh CLI.

When the RemoveVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), and (3, UUID3'). At this point the Kafka cluster has successfully recover from a disk failure in a controller node.

Node Failure Recovery

To support this scenario the Kafka cluster must be configured to use controller.quorum.bootstrap.servers. Let's assume that the voter set is (1, UUID1), (2, UUID2) and (3, UUID3).

At this replica 3 has failed and the Kafka operation would like to replica it. The operator would start a new controller with replica id 4. This replica will generate a UUID4 and persisted to the quorum state. Replica 4 will discover the leader by sending Fetch and FetchSnapshot request to the servers enumerated in controller.quorum.bootstrap.servers. Until it discovers the leader is able to fetch from the leader. The leader state for voters will be (1, UUID1), (2, UUID2), (3, UUID3) and for observers will be (4, UUID4).

The client can now decide to add replica (4, UUID4) to the set of voters. When this operation succeeds the set of voters will be  (1, UUID1), (2, UUID2), (3, UUID3) and (4, UUID4).

The client can now decided to remove replica (3, UUID3) from the set of voters.

Public Interfaces

Configuration

controller.quorum.voters

This is an existing configuration. If the cluster uses this configuration to configure the quorum, adding new replica ids will not be supported. The cluster will only support changing the GUID for an existing replica id.

controller.quorum.bootstrap.servers

This configuration can be used instead of controller.quorum.voters. This is a list of nodes that brokers and new controllers can use to discover the quorum leader. Brokers and new controllers (observers) will send FETCH request to all of the nodes in this configuration until they discover the quorum leader and the FETCH request succeeds. The quorum voters and their configuration will be learned by fetching and reading the log and snapshot.

Log and Snapshot Control Records

Two new control records will be added to the log and snapshot of a KRaft partition.

TODO: The LeaderChangeRecord needs to change

AddVoterRecord

A control record for instructing the voters to add a new voter to the topic partition. This record can exist in both the log and the snapshot of a topic partition.

The ControlRecordType  is TBD and will be updated when the code is commit to Kafka.

Code Block
{
  "type": "data",
  "name": "AddVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the voter getting added to the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The voter generated UUID of the voter getting added to the topic partition"},
    { "name": "EndPoints", "type": "[]VoterEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]}
  ]
}

RemoveVoterRecord

A control record for instructing the voters to remove a new voter to the topic partition. This record can exist in the log but not the snapshot of a topic partition.

The ControlRecordType  is TBD and will be updated when the code is commit to Kafka.

Code Block
{
Code Block
{
  "type": "data",
  "name": "AddVoterRecordRemoveVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the voter getting addedremoved tofrom the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The voter generated UUID of the voter getting added to the topic partition"},
    { "name": "EndPoints", "type": "[]VoterEndpointuuid", "versions": "0+",
      "about": "The endpointsvoter generated thatUUID canof bethe usedvoter togetting communicateremoved withfrom the topic voterpartition", "fields": [
      {}
  ]
}

Quorum State

A new field called VoterUuid  will get added to the QuorumStateData . Each KRaft replica will store a locally generated UUID in this field. The replica will generate and persist an UUID when it reads a version 0 of the QuorumStateData  or if the QuorumStateData  hasn't been persistent in the past.

Code Block
{
  "type": "data",
  "name": "NameQuorumStateData",
  "typevalidVersions": "string0-1",
  "versionsflexibleVersions": "0+",
  "mapKeyfields": true,[
    { "name": "ClusterId",  "abouttype": "The name of the endpoint." string", "versions": "0+"},
      { "name": "HostVoterUuid", "type": "stringuuid", "versions": "01+",
        "about": "The hostnamelocally generated UUID for this voter." },
      { "name": "PortLeaderId", "type": "uint16int32", "versions": "0+",
        "aboutdefault": "The port." -1"},
      { "name": "SecurityProtocolLeaderEpoch", "type": "int16int32", "versions": "0+",
        "aboutdefault": "The security protocol." }
    ]}
  ]
}

RemoveVoterRecord

A control record for instructing the voters to remove a new voter to the topic partition. This record can exist in the log but not the snapshot of a topic partition.

The ControlRecordType  is TBD and will be updated when the code is commit to Kafka.

Code Block
{
  "type": "data",
 -1"},
    { "name": "RemoveVoterRecordVotedId",
  "validVersionstype": "0int32",
  "flexibleVersionsversions": "0+",
  "fieldsdefault": ["-1"},
    { "name": "VersionAppliedOffset", "type": "int16int64", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterIdCurrentVoters", "type": "int32[]Voter", "versions": "0+", "entityTypenullableVersions": "brokerId"0+"}
  ],
  "commonStructs": [
    { "aboutname": "The ID of the voter getting removed from the topic partition"},
Voter", "versions": "0+", "fields": [
      { "name": "VoterUuidVoterId", "type": "uuidint32", "versions": "0+",}
      "about": "The voter generated UUID of the voter getting removed from the topic partition"]}
  ]
}

Quorum State

A new field called VoterUuid  will get added to the QuorumStateData . Each KRaft replica will store a locally generated UUID in this field. The replica will generate and persist an UUID when it reads a version 0 of the QuorumStateData  or if the QuorumStateData  hasn't been persistent in the past.



RPC

AddVoter

Request

TODO: describe this

Code Block
{
  "apiKey": TBD,
Code Block
{
  "type": "data""request",
  "listeners": ["controller", "broker"],
  "name": "QuorumStateDataAddVoterRequest",
  "validVersions": "0-1",
  "flexibleVersions",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
  "fields": [}
    { "name": "ClusterIdTopics", "type": "string[]TopicData", "versions": "0+"}, "fields": [
       { "name": "VoterUuidTopicName", "type": "uuidstring", "versions": "10+", "entityType": "topicName",
        "about": "The locallyname generatedof UUID for this voterthe topic." },
      { "name": "LeaderIdPartitions", "type": "int32[]PartitionData", "versions": "0+", "defaultfields": "-1"},
[
        { "name": "LeaderEpochIndex", "type": "int32", "versions": "0+",
          "defaultabout": "-1"The partition index." },
        { "name": "VotedIdVoterId", "type": "int32", "versions": "0+",
 "default         "about": "-1"The ID of the voter getting added to the topic partition." },
        { "name": "AppliedOffsetVoterUuid", "type": "int64uuid", "versions": "0+"},
      {    "nameabout": "CurrentVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+"}
  ],
  "commonStructs": [The voter generated UUID of the voter getting added to the topic partition." },
        { "name": "VoterEndPoints", "versionstype": "0+[]VoterEndpoint", "fieldsversions": [
"0+",
         { "nameabout": "VoterId", "type": "int32"The endpoints that can be used to communicate with the voter", "versionsfields": "0+"}[
    ]}
  ]
}

RPC

AddVoter

Request

TODO: describe this

Code Block
{
  "apiKey": TBD,
 { "typename": "requestName",
  "listenerstype": ["controllerstring", "broker"],
  "name"versions": "AddVoterRequest0+",
  "validVersionsmapKey": "0"true,
     "flexibleVersions       "about": "0+",
  "fields": [
The name of the endpoint." },
          { "name": "ClusterIdHost", "type": "string", "versions": "0+",
            "about": "The hostname." },
          { "name": "TopicsPort", "type": "[]TopicDatauint16", "versions": "0+",
            "fieldsabout": [
      "The port." },
          { "name": "TopicNameSecurityProtocol", "type": "stringint16", "versions": "0+",
 "entityType           "about": "The security protocol." }
        ]}
      ]}
    ]}
  ]
}

Response

Code Block
{
  "apiKey": TBD,
  "type": "response",
  topicName",
        "about": "The name of the topic." },
      { "name": "PartitionsAddVoterResponse",
  "typevalidVersions": "[]PartitionData0",
  "versionsflexibleVersions": "0+",
  "fields": [
        { "name": "IndexErrorCode", "type": "int32int16", "versions": "0+",
          "about": "The top partitionlevel error indexcode." },
        { "name": "VoterIdTopics", "type": "int32[]TopicData", "versions": "0+",
          "about": "The ID of the voter getting added to the topic partition." },
  "fields": [
      { "name": "VoterUuidTopicName", "type": "uuidstring", "versions": "0+",
 "entityType": "topicName",
        "about": "The voter generated UUIDname of the voter getting added to the topic partition." },
         { "name": "EndPointsPartitions", "type": "[]VoterEndpointPartitionData", "versions": "0+",
          "about": "The endpoints that can be used to communicate with the voter", "fields": [
          { "name": "NameIndex", "type": "stringint32", "versions": "0+", "mapKey": true,
            "about": "The name of the endpointThe partition index." },
          { "name": "HostErrorCode", "type": "stringint16", "versions": "0+",
            "about": "The hostname error code, or 0 if there was no fetch error." },
          { "name": "PortCurrentLeader", "type": "uint16LeaderIdAndEpoch", "versions": "0+",
            "about": "The port." },"taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "SecurityProtocolLeaderId", "type": "int16int32", "versions": "012+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of securitythe protocol." }
        ]}current leader or -1 if the leader is unknown." },
      ]}
    ]}
  ]
}

Response

Code Block
{
  "apiKeyname": TBD"LeaderEpoch",
  "type": "responseint32",
  "nameversions": "AddVoterResponse12+",
  "validVersionsdefault": "0-1",
    "flexibleVersions        "about": "0+",The latest known leader epoch"}
  "fields": [
       ]}
      ]}
    ]}
  ]
}

Handling

  1. Controller
    1. Reject if not leader
    2. Reject if endpoints is specified and the configuration controller.quorum.voters is used.
    3. Wait for replica (id, guid) to catch up
    4. Wait for no pending reassignments (id, guid)
    5. Append to log
    6. Wait for commit using majority of new configuration
    7. Send response
  2. Broker will forward the request to the active controller

RemoveVoter

Request

TODO: describe this

Code Block
{
  "apiKey": TBD,
  { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicDatarequest",
  "versionslisteners": ["0+controller", "fields": [broker"],
      { "name": "TopicNameRemoveVoterRequest",
 "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
  validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PartitionsTopics", "type": "[]PartitionDataTopicData", "versions": "0+", "fields": [
           { "name": "IndexTopicName", "type": "int32string", "versions": "0+",
 "entityType": "topicName",
        "about": "The partition indexname of the topic." },
        { "name": "ErrorCodePartitions", "type": "int16[]PartitionData", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },", "fields": [
        { "name": "CurrentLeaderIndex", "type": "LeaderIdAndEpochint32", "versions": "0+",
 "taggedVersions         "about": "0+", "tag": 0, "fields": [
  The partition index." },
        { "name": "LeaderIdVoterId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
  0+",
          "about": "The ID of the currentvoter leadergetting orremoved -1from if the leadertopic is unknownpartition." },
          { "name": "LeaderEpoch", "typename": "int32VoterUuid", "versionstype": "12+uuid", "defaultversions": "-10+",
            "about": "The latestvoter knowngenerated leader epoch"}
        ]}
      ]}
    ]}
  ]
}

Handling

    1. Reject if not leader
    2. Reject if endpoints is specified and the configuration controller.quorum.voters is used.
    3. Wait for replica (id, guid) to catch up
    4. Wait for no pending reassignments (id, guid)
    5. Append to log
    6. Wait for commit using majority of new configuration
    7. Send response

RemoveVoter

Request

TODO: describe this

UUID of the voter getting remove from the topic partition." },
      ]}
    ]}
  ]
}

Response

Code Block
{
  "apiKey": TBD,
  "type": "request"response",
  "name": "RemoveVoterResponse",
  "listenersvalidVersions": ["controller0",
  "broker"]flexibleVersions": "0+",
  "fields": [
    { "name": "RemoveVoterRequestErrorCode",
  "type": "int16", "validVersionsversions": "0+",
      "flexibleVersionsabout": "0+",
  "fields": [The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
         { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterIdErrorCode", "type": "int32int16", "versions": "0+",
          "about": "The ID of the voter getting removed from the topic partition." },
 error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "VoterUuidLeaderId", "type": "uuidint32", "versions": "012+",
          "aboutdefault": "-1"The, voter"entityType" generated UUID of the voter getting remove from the topic partition." }: "brokerId",
      ]}
    ]}
  ]
}

Response

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
"about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "ErrorCodeLeaderEpoch", "type": "int16int32", "versions": "012+",
      "aboutdefault": "The top level error code." }
"-1",
           { "nameabout": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
  The latest known leader epoch"}
        ]}
    { "name ]}
    ]}
  ]
}

Handling

  1. Controller
    1. Reject if not leader
    2. Reject if endpoints is specified and the configuration controller.quorum.voters is used.
    3. Wait for no pending reassignments
    4. Append to log
    5. Wait for commit using majority of new configuration
    6. Send response
    7. Resign by sending EndQuorumEpoch RPCs if the removed replica is the leader
  2. Broker will forward the request to the active controller

Fetch

The Fetch response version is bump and the fields remain unchanged.

Request

Version 14 adds the field ReplicaUuid. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeaderClusterId", "type": "LeaderIdAndEpochstring", "versions": "0+12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "012+", "tag": 0, "tagignorable": 0,true,
      "fieldsabout": [
           "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "LeaderIdReplicaId", "type": "int32", "versions": "120+", "default": "-1", "entityType" : "brokerId",
            "about": "The replica ID of the currentfollower, leaderof or -1 if thethis leaderrequest is unknownfrom a consumer." },
          { "name": "LeaderEpochReplicaUuid", "type": "int32uuid", "versions": "14+", "nullableVersions": "1214+", "default": "-1null",
            "about": "The latest known leader epoch"}
        ]}replica generated UUID. null otherwise." },
      ]}...
    ]}
  ]
}

Handling

  1. Reject if not leader
  2. Reject if endpoints is specified and the configuration controller.quorum.voters is used.
  3. Wait for no pending reassignments
  4. Append to log
  5. Wait for commit using majority of new configuration
  6. Send response

Fetch

TODO: The main thing to discuss is that the the voters needs to send the replica ID and UUID. The leaders needs to track the fetch offset for all replicas that have an ID and UUID using this tuple as a unique key.

FetchSnapshot

The FetchSnapshot The fetch response version is bump and the fields remain unchanged.

Request

Version 14 1 adds the field ReplicaUuid. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports become a followerleader.

Code Block
{
  "apiKey": 159,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequestFetchSnapshotRequest",
  "validVersions": "0-14",
  "flexibleVersions": "120+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "120+", "nullableVersions": "120+", "default": "null", "taggedVersions": "120+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known., Thisthis is used to validate metadata fetches prior to broker
   registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The replica ID of the follower, of -1 if this request is from a consumer.: "The replica ID of the follower" },
    { "name": "ReplicaUuid", "type": "uuid", "versions": "14+", "nullableVersions": "141+", "default": "null",
      "about": "The replica generatedUUID UUID.of nullthe otherwise.follower" },
    ...
  ]
}

Handling

TODO: The main thing to discuss is that the the voters needs to send the replica ID and UUID. The leaders needs to track the fetch offset for all replicas that have an ID and UUID using this tuple as a unique key.

Vote

Request

Changes:

  1. Candidate Id was moved out of the topic partition maps
  2. Candidate Uuid was added to the PartitionData
  3. VoterId was added to the top level
  4. VoterUuId was added to PartitionData
Code Block
{
  "apiKey": 52,
  "type": "request",
  "listeners": ["controller"],
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null" },
    { "name": "CandidateId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the voter sending the request" },
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the replica receiving the request to vote." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
          "about": "The bumped epoch of the candidate sending the request"},
        { "name": "CandidateId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the voter sending the request"},
        { "name": "CandidateUuid", "type": "uuid", "versions": "1+", "nullableVersions": "1+", "default": "null" }
          "about": "The candidate generated UUID, null otherwise" },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+", "nullableVersions": "1+", "default": "null" }
          "about": "The voter generated UUID of the replica receiving the request to vote, null otherwise" }, 
        { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the last record written to the metadata log"},
        { "name": "LastOffset", "type": "int64", "versions": "0+",
          "about": "The offset of the last record written to the metadata log"}
      ]}
    ]}
  ]
}

Response

TODO

Handling

  1. Fail the request of the voter ID and UUID doesn't match the local ID and UUID.
  2. All replicas that can become voter are allowed to vote even if they are not voters yet for the quorum.
  3. If granted persist the Candidate GUID and ID.

BeginQuorumEpoch

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the top level
  3. VoterUuId was added to PartitionData
  4. Allow tagged fields for version greater than or equal to 1.
Code Block
{
  "apiKey": 53,
  "type": "request",
  "listeners": ["controller"],
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the newly elected leader"}, 
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The voter ID of the receiving replica." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
          "about": "The voter UUID of the receiving replica." },
        { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the newly elected leader"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the newly elected leader"}
      ]}
    ]}
  ]
}

Handling

  1. Reject if the voter ID and UUID doesn't match.
  2. Reject if receiving voter hasn't read the AddVoterRecord with its voter id and UUID. May need to add a new error code. The leader will retry this error type.
  3. Handle as normal

EndQuorumEpoch

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the request
  3. VoterUuid was added to the Partitions
  4. ReplicaUuid was added to PreferredSuccessors
  5. Allow tagged fields for versions greater than or equal to 1.
  6. TODO: Verify that the changes to PreferredSuccessors  are backwards compatible.

...

Code Block
{
  "apiKey": 55,
  "type": "response",
  "name": "DescribeQuorumResponse",
  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}
  ],
  "commonStructs": [
    { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
      { "name": "ReplicaUuid", "type": "uuid", "versions": "1+" },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
      { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
      { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The leader wall clock append time of the offset for which the follower made the
   most recent fetch request. This is reported as the current time for the leader and -1 if unknown   for a voter" }
    ]}
  ]
}
  1. Replica UUID in ReplicaState

Handling

  1. Leader needs to include all replicas ID and UUID in the quorum
  2. CLI  - kafka-quorum
    1. Kafka needs a CLI that can display this information.
    2. The CLI  can assume that if a replica has both the Id and UUID then it can be become a voter.

Admin Client

TODO: Document Admin client needs to expose operations for describing the quorum, adding a voter and removing votersa voter.

Monitoring

TODO: What new metrics do we need?

  1. Count for the size of the quorum. This may already exist.
  2. Count for the number of observers that can be promoted to voters.

Command Line Interface

kafka-

...

metadata-shell.sh

TODO: Verify the name of this tool. Kafka quorum Shell should be able to read and display KRaft control records .from snapshots and logs

kafka-storage.sh

TODO: Improve the tool so that it can bootstrap the first quorum node by writing an AddVoter record to the cluster metadata log.

kafka-

...

metadata.sh

  1. Be able to describe quorurmquorum
    1. User need to be able to query  query enough information to be able to perform a voter the add or and remove voter remove operationoperations
  2. Be able to add voters
  3. Be able to remove voters

Proposed Changes

  1. Explain GUID
  2. Explain how a replica is bootstrap
  3. Explain how a replica recovers a replaced disk
  4. Log
    1. New control record for adding replicas
    2. New control record for removing replicas
  5. Snapshot - The kraft client needs to be able to include the control record for adding and removing replicas from the quorum.
  6. Listener - kraft needs an internal listener that can read and apply uncommitted data from the snapshot and log.

Compatibility, Deprecation, and Migration Plan

...