Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • BrokerId: we store this in order to detect conflicts if broker.id in the main configuration is changed unexpectedly. 
  • LeaderId: this is the last known leader of the quorum. A value of -1 indicates that there was no leader.
  • LeaderEpoch: this is the last known leader epoch. This is initialized to 0 when the quorum is bootstrapped and should never be negative.
  • VotedId: indicates the id of the broker that this replica voted for in the current epoch. A value of -1 indicates that the replica has not (or cannot) vote.
  • AppliedOffset: Reflects the maximum offset that has been applied to this quorum state. This is used for log recovery. The broker must scan from this point on initialization to detect updates to this file.
  • CurrentVoters: the latest known set of voters for this quorum.
  • TargetVoters: the latest known target voters if the quorum is being reassigned.

The use of this file will be described in more detail below as we describe the protocol. Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness.

...

  • AlterQuorum: Administrative API to change the members of the quorum.
  • DescribeQuorum: Administrative API to list the replication state (i.e. lag) of the voters.

Before getting into the details of these APIs, there are a few common attributes worth mentioning upfront:

...

  1. If it fails to receive a FetchQuorumRecordsResponse from the current leader before expiration of quorum.fetch.timeout.ms
  2. If it receives a EndQuorumEpoch request from the current leader
  3. If it fails to receive a majority of votes before expiration of quorum.election.timeout.ms after declaring itself a candidate.

...

Note that new elections are always delayed by a random time which is bounded by quorum.election.jitter.max.ms. This is part of the Raft protocol and is meant to prevent gridlocked elections. For example, with a quorum size of three, if only two voters are online, then we have to prevent repeated elections in which each voter declares itself a candidate and refuses to vote for the other.

...

Note on Gridlocked Elections:  It is possible that an election fails because each voter votes for itself. Or, with an even number of voters active, it is possible that a vote ends up split. Generally if a voter fails to get a majority of votes before quorum.election.timeout.ms, then the vote is deemed to have failed, which will cause the candidate to bump the epoch, step down, and backoff according to quorum.election.jitter.max.ms before retrying. Under some situations, a candidate can immediately detect when a vote has failed. For example, if there are only two voters and a candidate fails to get a vote from the other voter (i.e. VoteGranted is returned as false in the VoteResponse), then there is no need to wait for the election timeout. The candidate in this case can immediately step down and backoff.

...

Upon receiving the EndQuorumEpoch, the voter checks if the epoch from the request is greater than or equal to its last known epoch. If yes then it can transit to candidate state after waiting a random time up to the maximum jitter time defined by quorum.election.jitter.max.ms. Before beginning to collect voters, the voter must update the quorum-state file. If the epoch is smaller than the last known epoch, or the leader id is not known for this epoch, the request is rejected.

...

  1. Verify that the leader epoch is the same. If not, reject this request with MISMATCH_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually itself would learn about the new epoch anyways.
  2. Check that the epoch on the FetchOffset's  FetchEpoch are consistent with the leader's log. Specifically we check that FetchOffset is less than or equal to the end offset of FetchEpoch. If not, return OUT_OF_RANGE and encode the next FetchOffset as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable.
  3. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.
  4. If the `IsFollower` flag is set true by the sender,  but the sender is not inside leader's cached quorum, reply the request with NOT_FOLLOWER error to let the observer downgradeit downgrade. This could be caused by the reassignment or the old follower was in a zombie state. Note the fetched data shall still be valid. 

...

  1. First check the leader epoch in the response against the last known epoch. It is possible that the node receives the latest epoch from another source before the FindQuorum returns, so we always have to check. If the leader epoch is less than the last known epoch, we ignore the response.
  2. The node will then update its local cache of connection information with the values from the response. As when handling requests, the node will only updated update cached connection information corresponding to the largest observed BootTimestamp.
  3. Next check if there is a leader defined in the response. If so, then the node will update quorum-state and become a follower.
  4. If there is no leader and the sender is one of the voters, then the node will become a candidate and attempt to find the other nodes in the quorum (if they are not known) in order to send VoteRequest to. If the sender is not a voter, then it will continue to try and discover the leader by sending FindQuorum to other known voters or to bootstrap.servers.

Note on Connection Discovery with Gossip: The reason this API is specified as a gossip protocol is that the cluster must be able to propagate connection information even when there are insufficient voters available to establish a quorum. There must be some base layer which is able to spread enough initial metadata so that voters can connect to each other and conduct elections. An alternative way to do this would be to require connection information always be defined statically in configuration, however this makes operations  operations cumbersome in modern cloud environments where hosts change frequently. We have opted instead to embrace dynamic discovery.

...