Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Factor out error codes and try to make usage consistent

...

  • bootstrap.servers:  Defines the set of servers to contact to get quorum information. This does not have to address quorum members directly. For example, it could be a VIP.
  • quorum.voters: Defines the ids of the expected voters. This is only required when bootstrapping the cluster for the first time. As long as the cluster (hence the quorum) has started then new brokers would rely on FindQuorum (described below) to discover the current voters of the quorum. 
  • quorum.progress.timeout.ms: Maximum time without received fetch records request from a majority of the quorum before asking if there's a new epoch leader via FindQuorum.quorum.fetch.timeout.ms: Maximum time without a successful fetch from the current leader before a new election is started.
  • quorum.election.timeout.ms: Maximum time without collected a majority of votes during the candidate state before a new election is retried.
  • quorum.election.jitter.max.ms: Maximum random jitter after an election timeout before a new election is triggered.
  • quorum.request.timeout.ms: Maximum time before a pending request is considered failed and the connection is dropped.
  • quorum.retry.backoff.ms: Initial delay between request retries.
  • quorum.retry.backoff.max.ms: Max delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.ms (the same as in KIP-580).
  • broker.id: The existing broker id config shall be used as the voter id in the Raft quorum.

...

Below we describe the schema and behavior of each of these APIs. 

Vote

The Vote API is used by voters to hold an election. As mentioned above, the main difference from Raft is that this protocol is pull-based. Voters send fetch requests to the leaders in order to replicate from the log. These fetches also serve as a liveness check for the leader. If a voter perceives a leader as down, it will hold a new election and declare itself a candidate. A voter will begin a new election under three conditions:

Error Codes

Below we identify the error codes that will be used in this protocol:

  • INVALID_CLUSTER_ID: The request either included a clusterId which does not match the one expected by the leader or failed to include a clusterId when one was expected.
  • FENCED_LEADER_EPOCH: The leader epoch in the request is smaller than the latest known to the recipient of the request.
  • UNKNOWN_LEADER_EPOCH: The leader epoch in the request is larger than expected. Note that this is an unexpected error. Unlike normal Kafka log replication, it cannot happen that the follower receives the newer epoch before the leader.
  • OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords API to indicate that the follower has fetched from an invalid offset and should truncate to the offset/epoch indicated in the response.
  • NOT_QUORUM_LEADER: Used in DescribeQuorum and AlterQuorum to indicate that the recipient of the request is not the current leader.

Vote

The Vote API is used by voters to hold an election. As mentioned above, the main difference from Raft is that this protocol is pull-based. Voters send fetch requests to the leaders in order to replicate from the log. These fetches also serve as a liveness check for the leader. If a voter perceives a leader as down, it will hold a new election and declare itself a candidate. A voter will begin a new election under three conditions:

  1. If it fails to receive a FetchQuorumRecordsResponse from the current leader before expiration of quorum.fetch.timeout.ms
  2. If it receives a EndQuorumEpoch request from the current leader
  3. If it
  4. If it fails to receive a FetchQuorumRecordsResponse from the current leader before expiration of quorum.fetch.timeout.ms
  5. If it receives a EndQuorumEpoch request from the current leader
  6. If it fails to receive a majority of votes before expiration of quorum.election.timeout.ms after declaring itself a candidate.

...

Also note that a candidate always votes for itself at the current candidate epoch. That means, it will also need to update the quorum-state file as "voting for myself" before sending out the vote requests. On the other hand, if it receives a Vote request with a larger candidate epoch, it can still grants that vote while at the same time transiting back to voter state because a newer leader may has been elected for a newer epoch.

New Error Codes

Code Block
languagejava
LARGER_KNOWN_EPOCH(201, "The voter has seen larger epoch during election", LargerKnownEpochException::new),
LARGER_END_OFFSET(202, "The voter has seen larger end offset with the given epoch during election", LargerEndOffsetException::new);

Vote Vote Response Handling

When receiving a Vote response:

...

If the error code indicates that voter's known leader epoch is larger (i.e. if the error is  FENCED_LEADER_EPOCH), then the voter will update quorum-state and become a follower of that leader and begin sending FetchQuorumRecords requests.We shall reuse the same LARGER_LEADER_EPOCH error code.

EndQuorumEpoch

The EndQuorumEpoch API is used by a leader to gracefully step down so that an election can be held immediately without waiting for the election timeout. It is sent to all voters in the quorum. The primary use case for this is to enable graceful shutdown. If the shutting down voter is either an active current leader or a candidate if there is an election in progress, then this request will be sent. It is also used when the leader needs to be removed from the quorum following an AlterQuorum request. 

...

If there's no error code, then do nothing. Otherwise if the error code indicates there's already higher epoch leader already ("hey you're old news now so I don't care if you're stepping down or what"), then updating its quorum-state file while transiting to follower state. The leader treats this as a best-effort graceful shutdown. If voters cannot be reached to send EndQuorumEpoch, the leader will shutdown without retrying. In the worst case, if none of the EndQuorumEpoch requests are received, the election timeout will eventually trigger a new election.

We shall reuse the same LARGERsame FENCED_LEADER_EPOCH EPCOH error code .

FetchQuorumRecords

to indicate that there is a larger leader epoch. Depending on whether the sender is shutting down or not, it would either ignore the response error or become a follower of the leader indicated in the response.

FetchQuorumRecords

The fetch request is sent The fetch request is sent by both voters and observers to the current leader in order to replicate log changes. For voters this also serves as a liveness check of the leader.

...

  1. Verify that the leader epoch is the same. If not, reject this request with MISMATCHeither the FENCED_LEADER_EPOCH or UNKNOWN_LEADER_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually itself would learn about the new epoch anyways.
  2. Check that the epoch on the FetchOffset's  FetchEpoch are consistent with the leader's log. Specifically we check that FetchOffset is less than or equal to the end offset of FetchEpoch. If not, return OUT_OF_RANGE and encode the next FetchOffset as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable.
  3. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.

...

When handling the response, a follower/observer will do the following:

  1. If the response contains MISMATCH_EPOCH error FENCED_LEADER_EPOCH error code, check the leaderId from the response. If it is defined, then update the quorum-state file and become a follower. Otherwise, try to learn about the new leader via FindQuorum requests; in the mean time it may receive BeginQuorumEpoch request which would also update the new epoch / leader as well.
  2. If the response contains OUT_OF_RANGE error code, truncate its local log to the encoded nextFetchOffset, and then resend the FetchRecord request.
  3. If a follower has been kicked out off the quorum, it will receive a non-fatal NOT_FOLLOWER error code, and do a self-downgrade.

New Error Codes

Code Block
languagejava
MISMATCH_EPOCH(203, "The fetched follower has an inconsistent epoch with leader", LargerKnownEpochException::new);
NOT_FOLLOWER(204, "The fetched replica is no longer inside the quorum as a follower", NotFollowerException::new);

Discussion: Replication Progress Timeout for Zombie Leader

Note that followers will have to check any records received for the presence of control records. Specifically a follower/observer must check for voter assignment messages which could change its role.

Discussion: Replication Progress Timeout for Zombie Leader

There's one caveat of the pull-based model: say a new leader has been elected with a new epoch and everyone has There's one caveat of the pull-based model: say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader".

...

The protocol for changing the active voters is well-described in the Raft literature. The high-level idea is to use a new control record type to write quorum changes to the log. Once a quorum change has been written and committed to the log, then the quorum change can take effect.

Quorum Change Message Schema

This will be a new control record type with Type=2 in the key schema. Below we describe the message value schema used for the quorum change messages written to the log. 

...

  1. First check whether the node is the leader. If not, then return an error to let the client retry with FindQuorum. If the current leader is known to the receiving node, then include the LeaderId and LeaderEpoch in the response.
  2. Build the response using current assignment information and cached information state about replication progress.

...

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with FindQuorum.
  3. Otherwise the response can be returned to the application, or the request eventually times out.

New Error Codes

Code Block
languagejava
NOT_QUORUM_LEADER(205, "Target broker is not he leader of this quorum", LargerKnownEpochException::new);

AlterQuorum

AlterQuorum

The AlterQuorum API is used by the admin client to reassign the voters of the quorum or cancel an ongoing reassignment. It requires ALTER on CLUSTER permission.

...

Cancellation: If TargetVoters is set to null in the request, then effectively we will cancel an ongoing reassignment and leave the quorum with the current voters. The more preferable option is to always set the intended TargetVoters. Note that it is always possible to send a new AlterQuorum request even if the pending reassignment has not finished. So if we are in the middle of a reassignment from (1, 2, 3) to (4, 5, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "AlterQuorum",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      {"name": "TargetVoters", "type": "[]Voter", "nullableVersions": "0+", "default": "null", 
	   "about": "The target quorum, or null if this is a cancellation request",
	   "versions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
  ]
}

Response Schema

, 2, 3) to (4, 5, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters.

Request Schema

Code Block
{
Code Block
{
  // Possible top level error codes:
  //
  // NOT_QUORUM_LEADER
  // CLUSTER_AUTHORIZATION_FAILED
  //
  // Possible node level error codes:
  //
  // UNKNOWN_VOTER_ID
  //
  "apiKey": N,
  "type": "responserequest",
  "name": "AlterQuorumResponseAlterQuorum",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCodeClusterId", "type": "int16string", "versions": "0+"},
	      { "name": "TargetVoters", "type": "[]VoterResultVoter", "versionsnullableVersions": "0+", "default": "null", 
	   "about": "The target quorum, or null if this is a cancellation request",
	   "aboutversions": "The responses for each target voters in the new quorum.0+", "fields": [
        { "name": "VoterId", "type": "int32", "versions": "0+"},
      ]}
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "ErrorCodeAlterQuorumResponse",
  "typevalidVersions": "int160",
  "versionsflexibleVersions": "0+",
  "fields": [
       {"aboutname": "ErrorCode"The error code, or 0 if there was no error." , "type": "int16", "versions": "0+"}
  ]
}

AlterQuorum Request Handling

...

  1. If the target node is not the leader of the quorum, return NOT_QUORUM_LEADER code to the admin client. 
  2. If the included voter is unknown to the quorum leader, return UNKNOWN_VOTER_ID code to the admin client.
  3. Check whether TargetVoters in the request matches that from the latest assignment. If so, check whether it has been committed. If it has, then return immediately. Otherwise go to step 4.
  4. If Otherwise, if the TargetVoters in the request does not match the current TargetVoters, then append a new VoterAssignmentMessage to the log and wait for it to be committed
  5. Return successfully only when the desired TargetVoters has been safely committed to the log.

...

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with FindQuorum.
  3. Otherwise return the successful response to the application..

Note that the AdminClient should retry the AlterQuorum request if it times out before the reassignment had been committed to the log. If the caller does not continue retrying the operation, then there is no guarantee about whether or not the reassignment had been successfully received by the cluster.

Once  Once the reassignment has been accepted by the leader, then a user can monitor the status of the reassignment through the DescribeQuorum API. 

New Error Codes

...

languagejava

...

Quorum Change Protocol

This protocol allows arbitrary quorum changes through the AlterQuorum API. Internally, we arrive at the target quorum by making a sequence of single-member changes.

...