Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Proposed Changes

Replica UUID

In addition to a replica ID each replica assigned to a topic partition will have replica UUID. This UUID will be generated once and persisted as part of the quorum state for the topic partition. A replica (controller and broker) may host multiple topic partition in multiple log directories. The replica UUID will be generated per topic partition hosted by the replica.

Replicas in a topic partition that are voter or can become voters will be identified by their ID and UUID.

Voter Changes

Adding Voters

Voters are added to the cluster metadata partition by send an AddVoter RPC to the leader. For safety Kafka will only allow one voter change operation at a time. If there are any pending voter change operations the leader will wait for them to finish. If there are no pending voter change operations the leader will write a AddVoterRecord to the log and immediately update its in-memory quorum state to include this voter as part of the quorum. Any replica that replicates and reads this AddVoterRecord will update their in-voter set to include this new voter. They will not wait for these records to get committed before updating their voter set.

Once the AddVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the AddVoter RPC and process new voter change operations.

Removing Voters

Voter are removed from the cluster metadata partition by send an RemoveVoter RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the RemoveVoterRecord to the log and immediately update their voter set to the new configuration.

Once the RemoveVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC and resign from the quorum if the removed replica is the leader. To allow this operation to commit the followers will continue to fetch from the leader even if the leader is not part of the new voter set.

Bootstrapping

The section describe how the quorum will get bootstrap to support this KIP. There are two configurations that Kafka will support and each will be explain separately. The Kafka cluster can be configured to used the existing controller.quorum.voters  or the new property controller.quorum.bootstrap.servers.

controller.quorum.voters

This is a static quorum configuration where all of the voters' ID, host and port are specified. An example value of this configuration is 1@localhost:9092,2@localhost:9093,3@localhost:9094 .

...

The KRaft leader will not perform writes from the state machine (active controller) until is has written to the log an AddVoterRecord for every replica id in the controller.quorum.voters  configuration.

controller.quorum.bootstrap.servers

This configuration describe the set of hosts and ports that can be queried to discover the cluster metadata partition leader. Observers and to-be-added voters will send Fetch requests to this list of servers until the leader is discovered.

When using this configuration the quorum will be started with only one voter. This voter is bootstrapped by running the kafka-storage.sh tool. This tool will create the cluster metadata partition and append one AddVoterRecord to it. Additional voters can get added by using the AddVoter RPC as describe in this KIP.

Leader Election

It is possible for the leader to write an AddVoterRecord to the log and replica it to some of the voters in the new configuration. If the leader fails before this record has been replicated to the new voter it is possible that a new leader cannot be elected. This is because voters reject vote request from replicas that are not in the voter set. This check will be removed and replicas will reply to votes request when the candidate is not in the voter set or the voting replica is not in the voter set. The candidate must still have a longer log offset and epoch before the voter will grant a vote to it.

High Watermark

As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermakr to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remove replicas.

With this KIP it is possible for the leader to not be part of the voter set when the replica removed is the leader. In this case the leader will continue to handle Fetch and FetchSnapshot request as normal but it will not count itself when computing the high watermark.

Snapshots

The snapshot generation code needs to be extended to include these new KRaft specific control records for AddVoterRecord and RemoveVoterRecord. Before this KIP the snapshot didn't include any KRaft generated control records.

Internal Listener

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

Common Scenarios

To better illustrate this feature this section describes two common scenarios that the Kafka administrator may perform.

Disk Failure Recovery

If one of the replicas encounter a disk failure the operation can replace this disk with a new disk and start the replica.

...

When the RemoveVoter RPC succeeds the voters will be (1, UUID1), (2, UUID2), and (3, UUID3'). At this point the Kafka cluster has successfully recover from a disk failure in a controller node.

Node Failure Recovery

To support this scenario the Kafka cluster must be configured to use controller.quorum.bootstrap.servers. Let's assume that the voter set is (1, UUID1), (2, UUID2) and (3, UUID3).

...

The client can now decided to remove replica (3, UUID3) from the set of voters.

Public Interfaces

Configuration

controller.quorum.voters

This is an existing configuration. If the cluster uses this configuration to configure the quorum, adding new replica ids will not be supported. The cluster will only support changing the UUID for an existing replica id.

controller.quorum.bootstrap.servers

This configuration can be used instead of controller.quorum.voters. This is a list of nodes that brokers and new controllers can use to discover the quorum leader. Brokers and new controllers (observers) will send Fetch requests to all of the nodes in this configuration until they discover the quorum leader and the Fetch request succeeds. The quorum voters and their configuration will be learned by fetching and reading the records from the log and snapshot. This includes committed and uncommitted records.

Log and Snapshot Control Records

Two new control records will be added to the log and snapshot of a KRaft partition.

LeaderChangeMessage

Add an optional VoterUuid to Voter. This change is not needed for correctness but it is nice to have for tracing and debugging.

Code Block
languagejs
  {
    "type": "data",
    "name": "LeaderChangeMessage",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "Version", "type": "int16", "versions": "0+",
        "about": "The version of the leader change message" },
      { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The ID of the newly elected leader" },
      { "name": "Voters", "type": "[]Voter", "versions": "0+",
        "about": "The set of voters in the quorum for this epoch" },
      { "name": "GrantingVoters", "type": "[]Voter", "versions": "0+",
        "about": "The voters who voted for the leader at the time of election" }
    ],
    "commonStructs": [
      { "name": "Voter", "versions": "0+", "fields": [
        { "name": "VoterId", "type": "int32", "versions": "0+" },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+" }
      ]}
    ]
  }

AddVoterRecord

A control record for instructing the voters to add a new voter to the topic partition. This record can exist in both the log and the snapshot of a topic partition.

...

Code Block
languagejs
{
  "type": "data",
  "name": "AddVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the voter getting added to the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The voter generated UUID of the voter getting added to the topic partition"},
    { "name": "EndPoints", "type": "[]VoterEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]}
  ]
}

RemoveVoterRecord

A control record for instructing the voters to remove a new voter to the topic partition. This record can exist in the log but not the snapshot of a topic partition.

...

Code Block
languagejs
{
  "type": "data",
  "name": "RemoveVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the voter getting removed from the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The voter generated UUID of the voter getting removed from the topic partition"}
  ]
}

Quorum State

A new field called VoterUuid  will get added to the QuorumStateData . Each KRaft replica will store a locally generated UUID in this field. The replica will generate and persist an UUID when it reads a version 0 of the QuorumStateData  or if the QuorumStateData  hasn't been persistent in the past.

...

Code Block
languagejs
{
  "type": "data",
  "name": "QuorumStateData",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+"},
    { "name": "VoterUuid", "type" "uuid", "versions": "1+",
      "about": "The locally generated UUID for this replica.",
    { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
    { "name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
    { "name": "VotedUuid", "type": "uuid", "versions": "1+", "nullableVersions": "1+", "default": "null" },
    { "name": "AppliedOffset", "type": "int64", "versions": "0+"},
    { "name": "CurrentVoters", "type": "[]Voter", "versions": "0", "nullableVersions": "0"}
  ],
  "commonStructs": [
    { "name": "Voter", "versions": "0", "fields": [
      {"name": "VoterId", "type": "int32", "versions": "0"}
    ]}
  ]
}

RPCs

AddVoter

Request

This RPC can be sent by an administrative client to add a voter to the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "AddVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+" }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterId", "type": "int32", "versions": "0+",
          "about": "The ID of the voter getting added to the topic partition." },
        { "name": "VoterUuid", "type": "uuid", "versions": "0+",
          "about": "The voter generated UUID of the voter getting added to the topic partition." },
        { "name": "EndPoints", "type": "[]VoterEndpoint", "versions": "0+",
          "about": "The endpoints that can be used to communicate with the voter", "fields": [
          { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
            "about": "The name of the endpoint." },
          { "name": "Host", "type": "string", "versions": "0+",
            "about": "The hostname." },
          { "name": "Port", "type": "uint16", "versions": "0+",
            "about": "The port." },
          { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
            "about": "The security protocol." }
        ]}
      ]}
    ]}
  ]
}

Response

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "AddVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a AddVoter request it will do the following:

...

The broker will handle this request by forwarding the request to the active controller.

RemoveVoter

Request

This RPC can be sent by an administrative client to remove a voter from the set of voters. This RPC can be sent to a broker or controller. The broker will forward the request to the controller.

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "request",
  "listeners": ["controller", "broker"],
  "name": "RemoveVoterRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterId", "type": "int32", "versions": "0+",
          "about": "The ID of the voter getting removed from the topic partition." },
        { "name": "VoterUuid", "type": "uuid", "versions": "0+",
          "about": "The voter generated UUID of the voter getting remove from the topic partition." },
      ]}
    ]}
  ]
}

Response

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a RemoveVoter request it will do the following:

...

The broker will handle this request by forwarding the request to the active controller.

Fetch

The Fetch response version is bump and the fields remain unchanged.

Request

Version 14 adds the field ReplicaUuid to the FetchPartition. This field is populated with the replica generated UUID. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica ID of the follower, of -1 if this request is from a consumer." },
    ...
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true,
        "about": "The name of the topic to fetch." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true,
        "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "14+", "nullableVersions": "14+", "default": "null",
          "about": "The replica generated UUID. null otherwise." },
        ...
      ]}
    ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

...

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch from the leader even if it is not part of the voter set. This also means that if the leader is not part of the voter set it should not include itself when computing the committed offset (also known as the high-watermark).

FetchSnapshot

The FetchSnapshot response version is bump and the fields remain unchanged.

Request

Version 1 adds the field ReplicaUuid to PartitionSnapshot. If the ReplicaUuid and the ReplicaId fields are populated, the topic partition leader can assume that the replica supports becoming a voter.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
      "about": "The clusterId if known, this is used to validate metadata fetches prior to broker
   registration" },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The replica ID of the follower" },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
      "about": "The maximum bytes to fetch from all of the snapshots" },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch" },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index" },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "1+", "default": "null",
           "about": "The replica UUID of the follower" }, 
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current leader epoch of the partition, -1 for unknown leader epoch" },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch to fetch", "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "name": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot to start fetching from" }
      ]}
    ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

...

When removing the leader from the voter set, it will remain the leader for that epoch until the RemoveVoterRecord gets committed. This means that the leader needs to allow replicas (voters and observers) to fetch snapshots from the leader even if it is not part of the voter set.

Vote

The Vote response version is bump and the fields remain unchanged.

Request

Changes:

  1. Candidate Id was moved out of the topic partition maps
  2. Candidate Uuid was added to the PartitionData
  3. VoterId was added to the top level
  4. VoterUuId was added to PartitionData
Code Block
languagejs
{
  "apiKey": 52,
  "type": "request",
  "listeners": ["controller"],
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null" },
    { "name": "CandidateId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the voter sending the request" },
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the replica receiving the request to vote." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
          "about": "The bumped epoch of the candidate sending the request"},
        { "name": "CandidateId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the voter sending the request"},
        { "name": "CandidateUuid", "type": "uuid", "versions": "1+",
          "about": "The candidate generated UUID, null otherwise" },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+", "nullableVersions": "1+", "default": "null" }
          "about": "The voter generated UUID of the replica receiving the request to vote, null otherwise" }, 
        { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the last record written to the metadata log"},
        { "name": "LastOffset", "type": "int64", "versions": "0+",
          "about": "The offset of the last record written to the metadata log"}
      ]}
    ]}
  ]
}

Handling

The handling of the Vote request will from that described in KIP-595 in that all replicas are allowed to vote event if they are not voters for the quorum. The voter will persist in the quorum state both the candidate ID and UUID so that it only votes for at most one candidate for a given epoch.

...

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match the local ID and UUID.

BeginQuorumEpoch

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the top level
  3. VoterUuId was added to PartitionData
  4. Allow tagged fields for version greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 53,
  "type": "request",
  "listeners": ["controller"],
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the newly elected leader"}, 
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The voter ID of the receiving replica." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
          "about": "The voter UUID of the receiving replica." },
        { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the newly elected leader"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the newly elected leader"}
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match.
  2. NOT_VOTER - when the the replica is not a voter in the topic partition. This error should be retry by the leader of the topic partition.

EndQuorumEpoch

Request

  1. LeaderId was moved out of the topic partition maps
  2. VoterId was added to the request
  3. VoterUuid was added to the Partitions
  4. ReplicaUuid was added to PreferredSuccessors
  5. Allow tagged fields for versions greater than or equal to 1.
Code Block
languagejs
{
  "apiKey": 54,
  "type": "request",
  "listeners": ["controller"],
  "name": "EndQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The current leader ID that is resigning." },
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The voter ID of the receiving replica." }, 
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
          "about": "The voter UUID of the receiving replica." }, 
        { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The current leader ID that is resigning"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current epoch"},
        { "name": "PreferredSuccessors", "type": "[]ReplicaInfo", "versions": "0+",
          "about": "A sorted list of preferred successors to start the election", "fields": [
          { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
          { "name": "ReplicaUuid", "type": "uuid", "versions": "1+" }
        ]} 
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

  1. INVALID_REQUEST - when the voter ID and UUID doesn't match.
  2. NOT_VOTER - when the the replica is not a voter in the topic partition. This error could be retry by the leader of the topic partition.

DescribeQuorum

The version is of the request is increase and the fields remain unchanged.

Response

  1. Add ReplicaUuid to ReplicaState
Code Block
languagejs
{
  "apiKey": 55,
  "type": "response",
  "name": "DescribeQuorumResponse",
  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}
  ],
  "commonStructs": [
    { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
      { "name": "ReplicaUuid", "type": "uuid", "versions": "1+" },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
      { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
      { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The leader wall clock append time of the offset for which the follower made the
   most recent fetch request. This is reported as the current time for the leader and -1 if unknown   for a voter" }
    ]}
  ]
}

Handling

The handling of the request is the same as that described in KIP-595 with just the additional fields. Clients handling the response can assume that if a ReplicaState include both the ID and the UUID that replica can become a voter if is enumerated in the Observers field.

Admin Client

The Java Admin client will be extended to support the new field in the DescribeQuorum response and the new AddVoter and RemoveVoter RPCs.

Monitoring

NAMETAGSTYPENOTE
number-of-voterstype=raft-metricsgaugenumber of voters for the cluster metadata topic partition.
number-of-possible-voterstype=raft-metricsguagenumber of observer that could be promoted to voters.
pending-add-votertype=raft-metricsguage

1 if there is a pending add voter operation, 0 otherwise.

pending-remove-votertype=raft-metricsguage1 if there is a pending remove voter operation, 0 otherwise.

Command Line Interface

kafka-metadata-shell.sh

A future KIP will describe how the kafka-metadata-shell tool will be extended to be able to read and display KRaft control records from the quorum,  snapshot and log.

kafka-storage.sh

A future KIP describe how the kafka-storage tool will be extended to be able to bootstrap the first quorum node by writing an AddVoterRecord to the cluster metadata log when the controller.quorum.bootstrap.servers configuration is used.

kafka-metadata-quorum.sh

The kafka-metadata-quorum tool described in KIP-595 and KIP-836 will be improved to support these additional commands:

--describe

This command will print both the ReplicaId and ReplicaUuid for CurrentVoters. A new row called CouldBeVoters will be added which print the Replica ID and UUID of any replica that could be added to the voter set. E.g.

Code Block
> bin/kafka-metadata-quorum.sh --describe
ClusterId:              SomeClusterId
LeaderId:               0
LeaderEpoch:            15
HighWatermark:          234130
MaxFollowerLag:         34
MaxFollowerLagTimeMs:   15
CurrentVoters:          [{"id": 0, "uuid": "UUID1"}, {"id": 1, "uuid": "UUID2"}, {"id": 2, "uuid": "UUID2"}]
CouldBeVoters:          [{"id": 3, "uuid": "UUID3"}]

--describe replication

This command will print on additional column for the replica uuid after the replica id. E.g.

Code Block
> bin/kafka-metadata-quorum.sh --describe replication
ReplicaId   ReplicaUuid   LogEndOffset   ...
0           uuid1         234134         ...
...

--add-voter

This command is used to add new voters to the topic partition. The flags --replicaId and --replicaUuid must be specified. A future KIP will describe how the user can specify endpoint information for the replica.

--remove-voter

This command is used to remove voters from the topic partition. The flags --replicaId and --replicaUuid must be specified.

Compatibility, Deprecation, and Migration Plan

The features in this KIP will be supported if the ApiVersions of all of the voters and observers is greater than the versions described here. If the leader has a replica UUID for all of the voters then this KIP is supported by all of the voters.

Test Plan

This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ verifications.

Rejected Alternatives

KIP-642: Dynamic quorum reassignment. KIP-642 describe how to perform dynamic reassignment will multiple voters added and removed. KIP-642 doesn't support disk failures and it would be more difficult to implement compared to this KIP-853. In a future KIP we can describe how we can add administrative operations that support the addition and removal of multiple voters.

References

  1. Ongaro, Diego, and John Ousterhout. "In search of an understandable consensus algorithm." 2014 {USENIX} Annual Technical Conference ({USENIX}{ATC} 14). 2014.
  2. Ongaro, Diego. Consensus: Bridging theory and practice. Diss. Stanford University, 2014.

  3. KIP-595: A Raft Protocol for the Metadata Quorum