Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • bootstrap.servers:  Defines the set of servers to contact to get quorum information. This does not have to address quorum members directly. For example, it could be a VIP.
  • quorum.voters: Defines the ids of the expected voters. This is only required when bootstrapping the cluster for the first time. As long as the cluster (hence the quorum) has started then new brokers would rely on FindQuorum (described below) to discover the current voters of the quorum. 
  • quorum.progress.timeout.ms: Maximum time without received fetch records request from a majority of the quorum before asking if there's a new epoch leader via FindQuorum.
  • quorum.fetch.timeout.ms: Maximum time without a successful fetch from the current leader before a new election is started.
  • quorum.election.timeout.ms: Maximum time without collected a majority of votes during the candidate state before a new election is retried.
  • quorum.election.jitter.max.ms: Maximum random jitter after an election timeout before a new election is triggered.
  • quorum.request.timeout.ms: Maximum time before a pending request is considered failed and the connection is dropped.
  • quorum.retry.backoff.ms: Initial delay between request retries.
  • quorum.retry.backoff.max.ms: Max delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.ms (the same as in KIP-580).

Persistent State

...

  • BrokerId: we store this in order to detect conflicts if broker.id in the main configuration is changed unexpectedly. 
  • LeaderId: this is the last known leader of the quorum. A value of -1 indicates that there was no leader.
  • LeaderEpoch: this is the last known leader epoch. This is initialized to 0 when the quorum is bootstrapped and should never be negative.
  • VotedId: indicates the id of the broker that this replica voted for in the current epoch. A value of -1 indicates that the replica has not (or cannot) vote
  • AppliedOffset: Reflects the maximum offset that has been applied to this quorum state. This is used for log recovery. The broker must scan from this point on initialization to detect updates to this file
  • CurrentVoters: the latest known set of voters for this quorum
  • TargetVoters: the latest known target voters if the quorum is being reassigned

The use of this file will be described in more detail below as we describe the protocol. Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness.

...

Note: This protocol is only concerned with leader election and log replication. It does not specify how new log entries are appended to the leader's log. Typically this would be through specific metadata APIs. For example, KIP-497 adds an AlterIsr AlterISR API. When the leader of the metadata quorum (i.e. the controller) receives an AlterIsr AlterISR request, it will append an entry to its log.

...

When a voter handles a Vote request:

  1. First it checks whether a larger epoch an epoch larger than the candidate epoch from the request is known. If so, the vote is rejected.
  2. It checks if it has voted for that candidate epoch already. If it has, then only grant the vote if the candidate id matches the id that was already voted. Otherwise, the vote is rejected.
  3. If the candidate epoch is larger than its currently known epoch, then check that the candidate's log is at least as up-to-date as it (see above for the comparison rules). If yes, then grant that vote by first updating the quorum-state file, and then returning the response with voteGranted to yes; otherwise rejects that request with the response.

...

Upon receiving an EndQuorumEpoch request, voters will begin a new election after a random time bounded by quorum.election.jitter.max.ms.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "EndQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
    {"name": "ClusterId", "type": "string", "versions": "0+"},
    {"name": "ReplicaId", "type": "int32", "versions": "0+",
     "about": "The ID of the replica sending this request"},
    {"name": "LeaderId", "type": "int32", "versions": "0+",
     "about": The current leader ID or -1 if there is a vote in progress"},
    {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
     "about": The current epoch"}
  ]
}

...

To resolve this issue, we introduced a "progress.timeout.ms" config, such that if the leader did not receive FetchQuorumRecords requests from a majority of the quorum for that amount of time, it would start sending FindQuorum request to random nodes in the cluster. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes an observer; and if it realized that it is still within the current quorum's voter list, it would start fetching from that leader. Note that the node will remain a leader until it finds that it has been supplanted by another voter.

...

  1. First check whether the node is the leader. If not, then return an error to let the client retry with FindQuorum. If the current leader is known to the receiving node, then include the LeaderId and LeaderEpoch in the response.
  2. Build the response using cached information about replication progress.

...

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with FindQuorum.
  3. Otherwise , the response can be returned to the application, or the request eventually times out.

New Error Codes

Code Block
languagejava
NOT_QUORUM_LEADER(205, "Target broker is not he leader of this quorum", LargerKnownEpochException::new);

AlterQuorum

The AlterQuorum API is used by the admin client to reassign the voters of the quorum or cancel an ongoing reassignment. It requires ALTER on CLUSTER permission.

...

Code Block
{
  // Possible top level error codes:
  //
  // NOT_QUORUM_LEADER
  // CLUSTER_AUTHORIZATION_FAILED
  //
  // Possible node level error codes:
  //
  // UNKNOWN_VOTER_ID
  //
  "apiKey": N,
  "type": "response",
  "name": "AlterQuorumReassignmentResponseAlterQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
	  { "name": "TargetVoters", "type": ]
}

AlterQuorum Request Handling

Upon receiving the AlterQuorum request, the node will verify a couple of things:

"[]VoterResult", "versions": "0+",
        "about": "The responses for each target voters in the new quorum.", "fields": [
        { "name": "VoterId", "type": "int32", "versions": "0+"},
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." }
  ]
}

AlterQuorum Request Handling

Upon receiving the AlterQuorum request, the node will verify a couple of things:

  1. If the target node is not the leader of the quorum, return NOTIf the target node is not the leader of the quorum, return NOT_QUORUM_LEADER code to the admin client. 
  2. If the included voter is unknown to the quorum leader, return UNKNOWN_VOTER_ID code to the admin client.
  3. Otherwise, if the TargetVoters in the request does not match the current TargetVoters, then append a new AlterQuorumMessage to the log and wait for it to be committed. 
  4. Return successfully only when the desired TargetVoters has been safely committed to the log.

After returning the result of the request, the leader will begin the process to modify the quorum. This is described in more detail below, but basically the leader will begin tracking the fetch state of the target voters. It will then make a sequence of single-movement alterations by appending new AlterQuorumMessage records to the log.

...

AlterQuorum Response Handling

The response handling is similar to DescribeQuorum.  The admin client would do the following:

...

 Once the reassignment has been accepted by the leader, then a user can monitor the status of the reassignment through the DescribeQuorum API. 

New Error Codes

Code Block
languagejava
UNKNOWN_VOTER_ID(206, "The specified target voter is unknown to the leader", UnknownVoterIdException::new);

Quorum Change Protocol

This protocol allows arbitrary quorum changes through the AlterQuorumReassignment AlterQuorum API. Internally, we arrive at the target quorum by making a sequence of single-member changes.

...

  1. Append an AlterQuorumMessage to the log with the current voters as CurrentVoters and the TargetVoters from the AlterQuorum request.
  2. Leader will compute 3 sets based on CurrentVoters and TargetVoters:
    1. RemovingVoters: voters to be removed
    2. RetainedVoters: voters shared between current and target
    3. NewVoters: voters to be added
  3. Based on comparison between size(NewVoters) and size(RemovingVoters),
    1. If size(NewVoters) >= size(RemovingVoters), pick one of NewVoters as NV by writing a record with CurrentVoters=CurrentVoters + NV, and TargetVoters=TargetVoters.
    2. else pick one of RemovingVoters as RV, preferably a non-leader voter, by writing a record with CurrentVoters=CurrentVoters - RV, and TargetVoters=TargetVoters.
  4. Once the record is committed, the membership change is safe to be applied. Note that followers will begin acting with the new voter information as soon as the log entry has been appended. They do not wait for it to be committed.
  5. As there is a potential delay for propagating the removal message to the removing voter, we piggy-back on the `FetchQuorumRecords` to inform the voter to downgrade immediately after the new membership gets committed. See the error code NOT_FOLLOWER.
  6. The leader will continue this process until one of the following scenarios happens:
    1. If TargetVoters = CurrentVoters, then the reassignment is done. The leader will append a new entry with TargetVoters=null to the log.
    2. If the leader is the last remaining node in RemovingVoters, then it will step down by sending EndQuorumEpoch to the current voters. It will continue as a voter until the next leader removes it from the quorum.

...

  • Leader based approach: leader is responsible for tracking the observer progress by monitoring the new entry replication speed, and promote the observer when one replication round is less than election timeout, which suggests the gap is sufficiently small. This idea adds burden to leader logic complexity and overhead for leadership transfer, but is more centralized progress management.

  • Observer based approach: a self-nomination approach through observer initiates readIndex call to make sure it is sufficiently catching up with the leader. If we see consecutive rounds of readIndex success within election timeout, the observer will trigger a config change to add itself as a follower on next round. This is a de-centralized design which saves the dependency on elected leader to decide the role, thus easier to reason about.

To be effectively collecting information from one place and have the membership change logic centralized, leader based approach is more favorable. However, tracking the progress of an observer should only happen during reassignment, which is also the reason that why we may see incomplete log status from DescribeQuorum API when the cluster is stable.

...

The recommended usage is to first use the describeQuorum API to get the existing cluster status, learning whether there is any ongoing reassignment. The QuorumInfo struct shall be is deducted from the DescribeQuorumResponse:

Code Block
languagejava
titleKafkaAdminClient.java
public clase QuorumInfo {
	
	Map<Integer, VoterDescription> currentVoters();

	Map<Integer, VoterDescription> targetVoters();

	Map<Integer, VoterDescription> observers();

	Optional<Integer> leaderId();

    int leaderEpoch();

	long highWatermark();
 
	public static class VoterDescription {
		Optional<Long> logEndOffset();
        Optional<Long> lastCaughtUpTimeMs();
	}
}

We set The epoch and offsets are set as optional because the leader may not actively maintain all the observers' status.

...

When the admin calls the alterQuorum, underlying the thread will first send a FindQuorumRequest to find the stable leader. If that call times out or the group is in the election, the call would fail and inform user to retry. Once the leader is found, AdminClient will send AlterQuorumRequest to AlterQuorumRequest to it. If the returned error is retriable like NOT_QUORUM_LEADER, the tool will perform a rediscovery of the quorum leader. For fatal errors such as authorization errors, the call would fail and inform user the result.

...

kafka-reassign-quorum.sh will be implemented to inspect and create , inspect, or cancel quorum reassignments.

The --describe flag

...

For some operations that require updating multiple metadata entries (i.e. previously as multiple ZK writes updating more than one zkPathZK path), they would be interpreted as a batch-record appends.

...

Note it is possible that a request could time out before the leader has successfully committed the records, and the client or the broker itself would retry, which would result in duplicated updates to the quorum. Since in Kafka's usage, all updates are overwrites which are idempotent (as the nature of configuration is a key-value mapping). Therefore, we do not need to implement serial number or request caching to achieve "exactly-once".

...