Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: add error code specification for follower removal

...

  1. Quorum commit semantics: messages in Kafka are considered committed after replication to the current ISR, which is maintained in Zookeeper.
  2. Leader Electionelection: in Kafka, leader elections are done by the controller which writes the updated state to Zookeeper.

...

Voter: A voter is a replica which is eligible to cast votes during an election and potentially become a leader.

Candidate: When an election is started a voter will bump the leader epoch and cast a vote for itself. We refer to these as candidates.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "FetchQuorumRecordsRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "int32", "versions": "0+"},
      {"name": "ReplicaId", "type": "int32", "versions": "0+",
       "about": "The ID of the replica sending the request"},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The current leader epoch"},
      {"name": "FetchOffset", "type": "int64", "versions": "0+",
       "about": "The next expected offset to be replicated"},
      {"name": "FetchEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the last replicated record"},
	  {"name": "IsFollower", "type": "bool", "versions": "0+"
       "about": "True if this is a follower fetch in sender's perspective"}
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "FetchQuorumRecordsResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
      {"name": "NextFetchOffset", "type": "int64", "versions": "0+",
       "about": "If set, this is the offset that the follower should truncate to"},
      {"name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the next offset in case the follower needs to truncate"},
      {"name": "Records", "type": "bytes", "versions": "0+",
       "about" "The fetched record data"},
      {"name": "HighWatermark", "type": "int64", "versions": "0+",
       "about": "The current high watermark"},
	  {"name": "FirstDirtyOffset", "type": "int64", "versions": "0+",
       "about": "First dirty offset which allows followers to determine consistent snapshots"},
      {"name": "LastCaughtUpTimeMs", "type": "int64", "versions": "0+",
       "about" "The last time the follower was caught up with a majority of the voters"}
  ]
}

...

  1. Verify that the leader epoch is the same. If not, reject this request with MISMATCH_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually itself would learn about the new epoch anyways.
  2. Check that the epoch on the FetchOffset's  FetchEpoch are consistent with the leader's log. Specifically we check that FetchOffset is less than or equal to the end offset of FetchEpoch. If not, return OUT_OF_RANGE and encode the next FetchOffset as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable.
  3. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.
  4. If the `IsFollower` flag is set true but the sender is not inside leader's cached quorum, reply the request with NOT_FOLLOWER error to let the observer downgrade. Note the fetched data shall still be valid. 

The check in step 2 is similar to the logic that followers use today in Kafka through the OffsetsForLeaderEpoch API. In order to make this check efficient, Kafka maintains a leader-epoch-checkpoint file on disk, the contents of which is cached in memory. After every epoch change is observed, we write the epoch and its start offset to this file and update the cache. This allows us to efficiently check whether the leader's log has diverged from the follower and where the point of divergence is. Note that it may take multiple rounds of FetchQuorumRecords in order to find the first offset that diverges in the worst case.

...

  1. If the response contains MISMATCH_EPOCH error code, check the leaderId from the response. If it is defined, then update the quorum-state file and become a follower. Otherwise, try to learn about the new leader via FindQuorum requests; in the mean time it may receive BeginQuorumEpoch request which would also update the new epoch / leader as well.
  2. If the response contains OUT_OF_RANGE error code, truncate its local log to the encoded nextFetchOffset, and then resend the FetchRecord request.
  3. If a follower has been kicked out off the quorum, it will receive a non-fatal NOT_FOLLOWER error code, and do a self-downgrade.

New Error Codes

Code Block
languagejava
MISMATCH_EPOCH(203, "The fetched follower has an inconsistent epoch with leader", LargerKnownEpochException::new);
NOT_FOLLOWER(204, "The fetched replica is no longer inside the quorum as a follower", NotFollowerException::new);

Discussion: Replication Progress Timeout for Zombie Leader

...

  1. Append an AlterQuorumMessage to the log with the current voters as CurrentVoters and the TargetVoters from the AlterQuorum request.
  2. Leader will compute 3 sets based on CurrentVoters and TargetVoters:
    1. RemovingVoters: voters to be removed
    2. RetainedVoters: voters shared between current and target
    3. NewVoters: voters to be added
  3. Based on comparison between size(NewVoters) and size(RemovingVoters),
    1. If size(NewVoters) >= size(RemovingVoters), pick one of NewVoters as NV by writing a record with CurrentVoters=CurrentVoters + NV, and TargetVoters=TargetVoters.
    2. else pick one of RemovingVoters as RV, preferably a non-leader voter, by writing a record with CurrentVoters=CurrentVoters - RV, and TargetVoters=TargetVoters.
  4. Once the record is committed, the membership change is safe to be applied. Note that followers will begin acting with the new voter information as soon as the log entry has been appended. They do not wait for it to be committed.
  5. As there is a potential delay for propagating the removal message to the removing voter, we piggy-back on the `FetchQuorumRecords` to inform the voter to downgrade immediately after the new membership gets committed. See the error code NOT_FOLLOWER.
  6. The leader will continue this process until one of the following happens:
    1. If TargetVoters = CurrentVoters, then the reassignment is done. The leader will append a new entry with TargetVoters=null to the log.
    2. If the leader is the last remaining node in RemovingVoters, then it will step down by sending EndQuorumEpoch to the current voters. It will continue as a voter until the next leader removes it from the quorum.

...