...
bootstrap.servers
: Defines the set of servers to contact to get quorum information. This does not have to address quorum members directly. For example, it could be a VIP.quorum.voters:
Defines the ids of the expected voters. This is only required when bootstrapping the cluster for the first time. As long as the cluster (hence the quorum) has started then new brokers would rely on FindQuorum (described below) to discover the current voters of the quorum.quorum.progress.timeout.ms:
Maximum time without received fetch records request from a majority of the quorum before asking if there's a new epoch leader viaFindQuorum
.quorum.fetch.timeout.ms:
Maximum time without a successful fetch from the current leader before a new election is started.quorum.election.timeout.ms:
Maximum time without collected a majority of votes during the candidate state before a new election is retried.quorum.election.jitter.max.ms:
Maximum random jitter after an election timeout before a new election is triggered.quorum.request.timeout.ms:
Maximum time before a pending request is considered failed and the connection is dropped.quorum.retry.backoff.ms:
Initial delay between request retries.quorum.retry.backoff.max.ms
: Max delay between requests. Backoff will increase exponentially beginning fromquorum.retry.backoff.ms
(the same as in KIP-580).broker.id
: The existing broker id config shall be used as the voter id in the Raft quorum.
...
Below we describe the schema and behavior of each of these APIs.
Vote
The Vote API is used by voters to hold an election. As mentioned above, the main difference from Raft is that this protocol is pull-based. Voters send fetch requests to the leaders in order to replicate from the log. These fetches also serve as a liveness check for the leader. If a voter perceives a leader as down, it will hold a new election and declare itself a candidate. A voter will begin a new election under three conditions:
Error Codes
Below we identify the error codes that will be used in this protocol:
- INVALID_CLUSTER_ID: The request either included a clusterId which does not match the one expected by the leader or failed to include a clusterId when one was expected.
- FENCED_LEADER_EPOCH: The leader epoch in the request is smaller than the latest known to the recipient of the request.
- UNKNOWN_LEADER_EPOCH: The leader epoch in the request is larger than expected. Note that this is an unexpected error. Unlike normal Kafka log replication, it cannot happen that the follower receives the newer epoch before the leader.
- OFFSET_OUT_OF_RANGE: Used in the
FetchQuorumRecords
API to indicate that the follower has fetched from an invalid offset and should truncate to the offset/epoch indicated in the response. - NOT_QUORUM_LEADER: Used in
DescribeQuorum
andAlterQuorum
to indicate that the recipient of the request is not the current leader.
Vote
The Vote API is used by voters to hold an election. As mentioned above, the main difference from Raft is that this protocol is pull-based. Voters send fetch requests to the leaders in order to replicate from the log. These fetches also serve as a liveness check for the leader. If a voter perceives a leader as down, it will hold a new election and declare itself a candidate. A voter will begin a new election under three conditions:
- If it fails to receive a
FetchQuorumRecordsResponse
from the current leader before expiration ofquorum.fetch.timeout.ms
- If it receives a
EndQuorumEpoch
request from the current leader - If it
- If it fails to receive a
FetchQuorumRecordsResponse
from the current leader before expiration ofquorum.fetch.timeout.ms
- If it receives a
EndQuorumEpoch
request from the current leader - If it fails to receive a majority of votes before expiration of
quorum.election.timeout.ms
after declaring itself a candidate.
...
Also note that a candidate always votes for itself at the current candidate epoch. That means, it will also need to update the quorum-state
file as "voting for myself" before sending out the vote requests. On the other hand, if it receives a Vote
request with a larger candidate epoch, it can still grants that vote while at the same time transiting back to voter state because a newer leader may has been elected for a newer epoch.
New Error Codes
Code Block | ||
---|---|---|
| ||
LARGER_KNOWN_EPOCH(201, "The voter has seen larger epoch during election", LargerKnownEpochException::new),
LARGER_END_OFFSET(202, "The voter has seen larger end offset with the given epoch during election", LargerEndOffsetException::new); |
Vote Vote Response Handling
When receiving a Vote
response:
...
If the error code indicates that voter's known leader epoch is larger (i.e. if the error is FENCED_LEADER_EPOCH), then the voter will update quorum-state
and become a follower of that leader and begin sending FetchQuorumRecords
requests.We shall reuse the same LARGER_LEADER_EPOCH error code.
EndQuorumEpoch
The EndQuorumEpoch API is used by a leader to gracefully step down so that an election can be held immediately without waiting for the election timeout. It is sent to all voters in the quorum. The primary use case for this is to enable graceful shutdown. If the shutting down voter is either an active current leader or a candidate if there is an election in progress, then this request will be sent. It is also used when the leader needs to be removed from the quorum following an AlterQuorum
request.
...
If there's no error code, then do nothing. Otherwise if the error code indicates there's already higher epoch leader already ("hey you're old news now so I don't care if you're stepping down or what"), then updating its quorum-state file while transiting to follower state. The leader treats this as a best-effort graceful shutdown. If voters cannot be reached to send EndQuorumEpoch
, the leader will shutdown without retrying. In the worst case, if none of the EndQuorumEpoch
requests are received, the election timeout will eventually trigger a new election.
We shall reuse the same LARGERsame FENCED_LEADER_EPOCH EPCOH error code .
FetchQuorumRecords
to indicate that there is a larger leader epoch. Depending on whether the sender is shutting down or not, it would either ignore the response error or become a follower of the leader indicated in the response.
FetchQuorumRecords
The fetch request is sent The fetch request is sent by both voters and observers to the current leader in order to replicate log changes. For voters this also serves as a liveness check of the leader.
...
- Verify that the leader epoch is the same. If not, reject this request with MISMATCHeither the FENCED_LEADER_EPOCH or UNKNOWN_LEADER_EPOCH error.
- If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
- If the leader epoch is larger, then eventually itself would learn about the new epoch anyways.
- Check that the epoch on the
FetchOffset
's FetchEpoch
are consistent with the leader's log. Specifically we check thatFetchOffset
is less than or equal to the end offset ofFetchEpoch
. If not, return OUT_OF_RANGE and encode the nextFetchOffset
as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable. - If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.
...
When handling the response, a follower/observer will do the following:
- If the response contains MISMATCH_EPOCH error FENCED_LEADER_EPOCH error code, check the
leaderId
from the response. If it is defined, then update thequorum-state
file and become a follower. Otherwise, try to learn about the new leader viaFindQuorum
requests; in the mean time it may receive BeginQuorumEpoch request which would also update the new epoch / leader as well. - If the response contains OUT_OF_RANGE error code, truncate its local log to the encoded nextFetchOffset, and then resend the
FetchRecord
request. - If a follower has been kicked out off the quorum, it will receive a non-fatal NOT_FOLLOWER error code, and do a self-downgrade.
New Error Codes
Code Block | ||
---|---|---|
| ||
MISMATCH_EPOCH(203, "The fetched follower has an inconsistent epoch with leader", LargerKnownEpochException::new);
NOT_FOLLOWER(204, "The fetched replica is no longer inside the quorum as a follower", NotFollowerException::new); |
Discussion: Replication Progress Timeout for Zombie Leader
Note that followers will have to check any records received for the presence of control records. Specifically a follower/observer must check for voter assignment messages which could change its role.
Discussion: Replication Progress Timeout for Zombie Leader
There's one caveat of the pull-based model: say a new leader has been elected with a new epoch and everyone has There's one caveat of the pull-based model: say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader".
...
The protocol for changing the active voters is well-described in the Raft literature. The high-level idea is to use a new control record type to write quorum changes to the log. Once a quorum change has been written and committed to the log, then the quorum change can take effect.
Quorum Change Message Schema
This will be a new control record type with Type=2 in the key schema. Below we describe the message value schema used for the quorum change messages written to the log.
...
- First check whether the node is the leader. If not, then return an error to let the client retry with
FindQuorum
. If the current leader is known to the receiving node, then include theLeaderId
andLeaderEpoch
in the response. - Build the response using current assignment information and cached information state about replication progress.
...
- If the response indicates that the intended node is not the current leader, then check the response to see if the
LeaderId
has been set. If so, then attempt to retry the request with the new leader. - If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with
FindQuorum
. - Otherwise the response can be returned to the application, or the request eventually times out.
New Error Codes
Code Block | ||
---|---|---|
| ||
NOT_QUORUM_LEADER(205, "Target broker is not he leader of this quorum", LargerKnownEpochException::new); |
AlterQuorum
AlterQuorum
The AlterQuorum
API is used by the admin client to reassign the voters of the quorum or cancel an ongoing reassignment. It requires ALTER on CLUSTER permission.
...
Cancellation: If TargetVoters
is set to null in the request, then effectively we will cancel an ongoing reassignment and leave the quorum with the current voters. The more preferable option is to always set the intended TargetVoters
. Note that it is always possible to send a new AlterQuorum
request even if the pending reassignment has not finished. So if we are in the middle of a reassignment from (1, 2, 3) to (4, 5, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters
.
Request Schema
Code Block |
---|
{
"apiKey": N,
"type": "request",
"name": "AlterQuorum",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{"name": "ClusterId", "type": "string", "versions": "0+"},
{"name": "TargetVoters", "type": "[]Voter", "nullableVersions": "0+", "default": "null",
"about": "The target quorum, or null if this is a cancellation request",
"versions": "0+", "fields": [
{"name": "VoterId", "type": "int32", "versions": "0+"}
]}
]
} |
Response Schema
, 2, 3) to (4, 5, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters
.
Request Schema
Code Block |
---|
{ |
Code Block |
{ // Possible top level error codes: // // NOT_QUORUM_LEADER // CLUSTER_AUTHORIZATION_FAILED // // Possible node level error codes: // // UNKNOWN_VOTER_ID // "apiKey": N, "type": "responserequest", "name": "AlterQuorumResponseAlterQuorum", "validVersions": "0", "flexibleVersions": "0+", "fields": [ {"name": "ErrorCodeClusterId", "type": "int16string", "versions": "0+"}, { "name": "TargetVoters", "type": "[]VoterResultVoter", "versionsnullableVersions": "0+", "default": "null", "about": "The target quorum, or null if this is a cancellation request", "aboutversions": "The responses for each target voters in the new quorum.0+", "fields": [ { "name": "VoterId", "type": "int32", "versions": "0+"}, ]} ] } |
Response Schema
Code Block |
---|
{ "apiKey": N, "type": "response", "name": "ErrorCodeAlterQuorumResponse", "typevalidVersions": "int160", "versionsflexibleVersions": "0+", "fields": [ {"aboutname": "ErrorCode"The error code, or 0 if there was no error." , "type": "int16", "versions": "0+"} ] } |
AlterQuorum Request Handling
...
- If the target node is not the leader of the quorum, return NOT_QUORUM_LEADER code to the admin client.
- If the included voter is unknown to the quorum leader, return UNKNOWN_VOTER_ID code to the admin client.
- Check whether
TargetVoters
in the request matches that from the latest assignment. If so, check whether it has been committed. If it has, then return immediately. Otherwise go to step 4. - If Otherwise, if the
TargetVoters
in the request does not match the currentTargetVoters
, then append a newVoterAssignmentMessage
to the log and wait for it to be committed. - Return successfully only when the desired
TargetVoters
has been safely committed to the log.
...
- If the response indicates that the intended node is not the current leader, then check the response to see if the
LeaderId
has been set. If so, then attempt to retry the request with the new leader. - If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with
FindQuorum
. - Otherwise return the successful response to the application..
Note that the AdminClient
should retry the AlterQuorum
request if it times out before the reassignment had been committed to the log. If the caller does not continue retrying the operation, then there is no guarantee about whether or not the reassignment had been successfully received by the cluster.
Once Once the reassignment has been accepted by the leader, then a user can monitor the status of the reassignment through the DescribeQuorum
API.
New Error Codes
...
language | java |
---|
...
Quorum Change Protocol
This protocol allows arbitrary quorum changes through the AlterQuorum
API. Internally, we arrive at the target quorum by making a sequence of single-member changes.
...