Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP describes a protocol for extending KIP-595 and KIP-630 so that the operator can programmatically update the voter set in a way that is safe and is available. There are two important use cases that this KIP supports. One use case is that the operator wants to change the number of controllers by adding or removing a controller.  The other use case is that the operation wants to replace a controller because of a disk or hardware failure.

Key Terms

These is the definition of some important terms that are used through the document.

Voters: A voter is any replica that can transition to the candidate state and to the leader state. Voters are required to have an ID and UUID. Each replica keeps their own set of voters that are part of the topic partition. For a replica, the voters are the replica ID and UUID is in its own voter set. A candidate needs to get votes from the majority of its own voter set before is can become the leader of an epoch. When a voter becomes a leader it will use its voter set to determine when an offset has been committed.

Observers: An observer is any replica that is not in the voter set. This is because they have an ID and UUID which is not in the voter set or they don't have an ID or UUID.

Proposed Changes

This KIP is inspired by Chapter 4 of Consensus: Bridging Theory and Practice [2]. The description of this KIP makes the assumption that the reader is familiar with the references enumerated at the bottom of this page.

...

Voter are removed from the cluster metadata partition by send an sending a RemoveVoter RPC to the leader. This works similar to adding a voter. If there are no pending voter change operations the leader will append the RemoveVoterRecord to the log and immediately update their its voter set to the new configuration.

Once the RemoveVoterRecord operation has been committed by the majority of the new voter set, the leader can respond to the RPC. If the removed voters is the leader, the leader will resign from the quorum when the RemoveVoterRecord has been committed. To allow this operation to commit to be committed and for the leader to resign the followers will continue to fetch from the leader even if the leader is not part of the new voter set. In KRaft leader election is triggered when the voter hasn't received a successful response in the fetch timeout.

Bootstrapping

The section describe how the quorum will get bootstrap to support this KIP. There are two configurations that Kafka will support and each will be explain separately. The Kafka cluster can be configured to use the existing controller.quorum.voters  or the new property called controller.quorum.bootstrap.servers. These two configurations are mutually exclusive the KRaft cluster is expected to use one or the other but not both.

...

Code Block
languagejs
{
  "type": "data",
  "name": "AddVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the voter getting added to the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The voter generated UUID of the voter getting added to the topic partition"},
    { "name": "EndPoints", "type": "[]VoterEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with the voter", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]}
  ]
} "The security protocol." }
    ]}
  ]
}

Handling

KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters an AddVoterRecord it will add the replica ID and UUID to its voter set. If the replica getting added is itself then it will allow the transition to candidate when the fetch timer expires. The fetch timer is reset whenever it receives a successful Fetch or FetchSnapshot response.

RemoveVoterRecord

A control record for instructing the voters to remove a new voter to the topic partition. This record can exist in the log but not the snapshot of a topic partition.

...

Code Block
languagejs
{
  "type": "data",
  "name": "RemoveVoterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the add voter record"},
    { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the voter getting removed from the topic partition"},
    { "name": "VoterUuid", "type": "uuid", "versions": "0+",
      "about": "The voter generated UUID of the voter getting removed from the topic partition"}
  ]
}

Handling

KRaft replicas will read all of the control records in the snapshot and the log irrespective of the commit state and HWM. When a replica encounters a RemoveVoterRecord it will remove the replica ID and UUID from its voter set. If the replica getting removed is the leader and is the local replica then the replica will stay leader until the RemoveVoterRecord gets committed or the epoch advances losing leadership of the latest epoch.

Quorum State

A new field called VoterUuid  will get added to the QuorumStateData . Each KRaft replica will store a locally generated UUID in this field. The replica will generate and persist an UUID when it reads a version 0 of the QuorumStateData  or if the QuorumStateData  hasn't been persistent in the past.

...

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "AddVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a AddVoter request it will do the following:

  1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of the leader.
  2. Wait for until there are no uncommitted add or remove voter records.
  3. Append the AddVoterRecord to the log.
  4. The KRaft internal listener will read this record from the log and add the voter to the voter set.
  5. Wait for the AddVoterRecord to commit using the majority of new configuration.
  6. Send the AddVoter response to the client.

In 1. we need , the leader needs to wait for the replica to catch up because when the AddVoterRecord is append appended to the log the set of voter changes. If the added voter is far behind then it can take some time for it to reach the HWM. During this time the leader cannot commit data and the quorum will be unavailable from the perspective of the state machine. We mitigate this by waiting for the new replica to catch up before adding it to the set of voters.

In 4., the new replica will be part of the quorum so the leader will start sending BeginQuorumEpoch requests to this replica. It is possible that the new replica has not yet replicated and applied this AddVoterRecord so it doesn't know that it is a voter for this topic partition. The new replica will fail this RPC until it discover discovers that it is in the voter set. The leader will continue to retry until the RPC succeeds.

...

Code Block
languagejs
{
  "apiKey": "TBD",
  "type": "response",
  "name": "RemoveVoterResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code." }
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType" : "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"}
        ]}
      ]}
    ]}
  ]
}

Handling

When the leader receives a RemoveVoter request it will do the following:

...

Code Block
languagejs
{
  "apiKey": 1,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "0-14",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The replica ID of the follower, of -1 if this request is from a consumer." },
    ...
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true,
        "about": "The name of the topic to fetch." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true,
        "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "14+", "nullableVersions": "14+", "default": "null",
          "about": "The replica generated UUID. null otherwise." },
        ...
      ]}
    ]}
  ]
}

Handling

Replica that support becoming voters will send both the replica ID and UUID in the Fetch request. The leader will assume that replicas that report both fields are voters or are able to become voters.

...

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "listeners": ["controller"],
  "name": "FetchSnapshotRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
      "about": "The clusterId if known, this is used to validate metadata fetches prior to broker
   registration" },
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The replica ID of the follower" },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
      "about": "The maximum bytes to fetch from all of the snapshots" },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch" },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index" },
        { "name": "ReplicaUuid", "type": "uuid", "versions": "1+", "default": "null",
           "about": "The replica UUID of the follower" }, 
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current leader epoch of the partition, -1 for unknown leader epoch" },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "about": "The snapshot endOffset and epoch to fetch", "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+" },
          { "name": "Epoch", "type": "int32", "versions": "0+" }
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot to start fetching from" }
      ]}
    ]}
  ]
}

Handling

Similar to Fetch, replica that support becoming voters will send both the replica ID and UUID in the FetchSnapshot request. The leader will assume that replicas that report both fields are voters or are able to become voters.

...

Code Block
languagejs
{
  "apiKey": 52,
  "type": "request",
  "listeners": ["controller"],
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null" },
    { "name": "CandidateId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the voter sending the request" },
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the replica receiving the request to vote." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
          "about": "The bumped epoch of the candidate sending the request"},
        { "name": "CandidateId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the voter sending the request"},
        { "name": "CandidateUuid", "type": "uuid", "versions": "1+",
          "about": "The candidate generated UUID, null otherwise" },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+", "nullableVersions": "1+", "default": "null" }
          "about": "The voter generated UUID of the replica receiving the request to vote, null otherwise" }, 
        { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the last record written to the metadata log"},
        { "name": "LastOffset", "type": "int64", "versions": "0+",
          "about": "The offset of the last record written to the metadata log"}
      ]}
    ]}
  ]
}

Handling

The handling of the Vote request will from that described in KIP-595 in that all replicas are allowed to vote event if they are not voters for the quorum. The voter will persist in the quorum state both the candidate ID and UUID so that it only votes for at most one candidate for a given epoch.

...

Code Block
languagejs
{
  "apiKey": 53,
  "type": "request",
  "listeners": ["controller"],
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The ID of the newly elected leader"}, 
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The voter ID of the receiving replica." },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
          "about": "The voter UUID of the receiving replica." },
        { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The ID of the newly elected leader"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the newly elected leader"}
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

...

Code Block
languagejs
{
  "apiKey": 54,
  "type": "request",
  "listeners": ["controller"],
  "name": "EndQuorumEpochRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "LeaderId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The current leader ID that is resigning." },
    { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId",
      "about": "The voter ID of the receiving replica." }, 
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "VoterUuid", "type": "uuid", "versions": "1+",
          "about": "The voter UUID of the receiving replica." }, 
        { "name": "LeaderId", "type": "int32", "versions": "0", "entityType": "brokerId",
          "about": "The current leader ID that is resigning"},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The current epoch"},
        { "name": "PreferredSuccessors", "type": "[]ReplicaInfo", "versions": "0+",
          "about": "A sorted list of preferred successors to start the election", "fields": [
          { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
          { "name": "ReplicaUuid", "type": "uuid", "versions": "1+" }
        ]} 
      ]}
    ]}
  ]
}

Handling

This request will be handle as described in KIP-595 with the following additional errors:

...

Code Block
languagejs
{
  "apiKey": 55,
  "type": "response",
  "name": "DescribeQuorumResponse",
  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}
  ],
  "commonStructs": [
    { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
      { "name": "ReplicaUuid", "type": "uuid", "versions": "1+" },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
      { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter" },
      { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter" }
    ]}
  ]
}

Handling

The handling of the request is the same as that described in KIP-595 with just the additional fields. Clients handling the response can assume that if a ReplicaState include both the ID and the UUID that replica can become a voter if is enumerated in the Observers field.

...

This KIP will be tested using unittest, integration tests, system test, simulation tests and TLA+ verificationsspecification.

Rejected Alternatives

KIP-642: Dynamic quorum reassignment. KIP-642 describe how to perform dynamic reassignment will multiple voters added and removed. KIP-642 doesn't support disk failures and it would be more difficult to implement compared to this KIP-853. In a future KIP we can describe how we can add administrative operations that support the addition and removal of multiple voters.

...