...
- Quorum commit semantics: messages in Kafka are considered committed after replication to the current ISR, which is maintained in Zookeeper.
- Leader Electionelection: in Kafka, leader elections are done by the controller which writes the updated state to Zookeeper.
...
Voter: A voter is a replica which is eligible to cast votes during an election and potentially become a leader.
Candidate: When an election is started a voter will bump the leader epoch and cast a vote for itself. We refer to these as candidates.
...
Code Block |
---|
{
"apiKey": N,
"type": "request",
"name": "FetchQuorumRecordsRequest",
"validVersions": "0",
"fields": [
{"name": "ClusterId", "type": "int32", "versions": "0+"},
{"name": "ReplicaId", "type": "int32", "versions": "0+",
"about": "The ID of the replica sending the request"},
{"name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The current leader epoch"},
{"name": "FetchOffset", "type": "int64", "versions": "0+",
"about": "The next expected offset to be replicated"},
{"name": "FetchEpoch", "type": "int32", "versions": "0+",
"about": "The epoch of the last replicated record"},
{"name": "IsFollower", "type": "bool", "versions": "0+"
"about": "True if this is a follower fetch in sender's perspective"}
]
} |
Response Schema
Code Block |
---|
{ "apiKey": N, "type": "response", "name": "FetchQuorumRecordsResponse", "validVersions": "0", "fields": [ {"name": "ErrorCode", "type": "int16", "versions": "0+"}, {"name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown."}, {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"} {"name": "NextFetchOffset", "type": "int64", "versions": "0+", "about": "If set, this is the offset that the follower should truncate to"}, {"name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+", "about": "The epoch of the next offset in case the follower needs to truncate"}, {"name": "Records", "type": "bytes", "versions": "0+", "about" "The fetched record data"}, {"name": "HighWatermark", "type": "int64", "versions": "0+", "about": "The current high watermark"}, {"name": "FirstDirtyOffset", "type": "int64", "versions": "0+", "about": "First dirty offset which allows followers to determine consistent snapshots"}, {"name": "LastCaughtUpTimeMs", "type": "int64", "versions": "0+", "about" "The last time the follower was caught up with a majority of the voters"} ] } |
...
- Verify that the leader epoch is the same. If not, reject this request with MISMATCH_EPOCH error.
- If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
- If the leader epoch is larger, then eventually itself would learn about the new epoch anyways.
- Check that the epoch on the
FetchOffset
's FetchEpoch
are consistent with the leader's log. Specifically we check thatFetchOffset
is less than or equal to the end offset ofFetchEpoch
. If not, return OUT_OF_RANGE and encode the nextFetchOffset
as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable. - If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.
- If the `IsFollower` flag is set true but the sender is not inside leader's cached quorum, reply the request with NOT_FOLLOWER error to let the observer downgrade. Note the fetched data shall still be valid.
The check in step 2 is similar to the logic that followers use today in Kafka through the OffsetsForLeaderEpoch
API. In order to make this check efficient, Kafka maintains a leader-epoch-checkpoint
file on disk, the contents of which is cached in memory. After every epoch change is observed, we write the epoch and its start offset to this file and update the cache. This allows us to efficiently check whether the leader's log has diverged from the follower and where the point of divergence is. Note that it may take multiple rounds of FetchQuorumRecords
in order to find the first offset that diverges in the worst case.
...
- If the response contains MISMATCH_EPOCH error code, check the
leaderId
from the response. If it is defined, then update thequorum-state
file and become a follower. Otherwise, try to learn about the new leader viaFindQuorum
requests; in the mean time it may receive BeginQuorumEpoch request which would also update the new epoch / leader as well. - If the response contains OUT_OF_RANGE error code, truncate its local log to the encoded nextFetchOffset, and then resend the
FetchRecord
request. - If a follower has been kicked out off the quorum, it will receive a non-fatal NOT_FOLLOWER error code, and do a self-downgrade.
New Error Codes
Code Block | ||
---|---|---|
| ||
MISMATCH_EPOCH(203, "The fetched follower has an inconsistent epoch with leader", LargerKnownEpochException::new);
NOT_FOLLOWER(204, "The fetched replica is no longer inside the quorum as a follower", NotFollowerException::new); |
Discussion: Replication Progress Timeout for Zombie Leader
...
- Append an
AlterQuorumMessage
to the log with the current voters asCurrentVoters
and theTargetVoters
from theAlterQuorum
request. - Leader will compute 3 sets based on
CurrentVoters
andTargetVoters
:RemovingVoters
: voters to be removedRetainedVoters
: voters shared between current and targetNewVoters
: voters to be added
- Based on comparison between
size(NewVoters)
andsize(RemovingVoters)
,- If
size(NewVoters)
>=size(RemovingVoters
), pick one ofNewVoters
asNV
by writing a record withCurrentVoters=CurrentVoters + NV
, andTargetVoters=TargetVoters
. - else pick one of
RemovingVoters
asRV
, preferably a non-leader voter, by writing a record withCurrentVoters=CurrentVoters - RV
, andTargetVoters=TargetVoters
.
- If
- Once the record is committed, the membership change is safe to be applied. Note that followers will begin acting with the new voter information as soon as the log entry has been appended. They do not wait for it to be committed.
- As there is a potential delay for propagating the removal message to the removing voter, we piggy-back on the `FetchQuorumRecords` to inform the voter to downgrade immediately after the new membership gets committed. See the error code NOT_FOLLOWER.
- The leader will continue this process until one of the following happens:
- If
TargetVoters = CurrentVoters
, then the reassignment is done. The leader will append a new entry withTargetVoters=null
to the log. - If the leader is the last remaining node in
RemovingVoters
, then it will step down by sendingEndQuorumEpoch
to the current voters. It will continue as a voter until the next leader removes it from the quorum.
- If
...