Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "FetchQuorumRecordsRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "int32", "versions": "0+"},
      {"name": "ReplicaId", "type": "int32", "versions": "0+",
       "about": "The ID of the replica sending the request"},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The current leader epoch"},
      {"name": "FetchOffset", "type": "int64", "versions": "0+",
       "about": "The next expected offset to be replicated"},
      {"name": "FetchEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the last replicated record"},
	  {"name": "IsFollower", "type": "bool", "versions": "0+"
       "about": "True if this is a follower fetch in sender's perspective"}
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "FetchQuorumRecordsResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
      {"name": "NextFetchOffset", "type": "int64", "versions": "0+",
       "about": "If set, this is the offset that the follower should truncate to"},
      {"name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the next offset in case the follower needs to truncate"},
      {"name": "Records", "type": "bytes", "versions": "0+",
       "about" "The fetched record data"},
      {"name": "HighWatermark", "type": "int64", "versions": "0+",
       "about": "The current high watermark"},
	  {"name": "FirstDirtyOffset", "type": "int64", "versions": "0+",
       "about": "First dirty offset which allows followers to determine consistent snapshots"},
      {"name": "LastCaughtUpTimeMs", "type": "int64", "versions": "0+",
       "about" "The last time the follower was caught up with a majority of the voters"}
  ]
}

...

  1. Verify that the leader epoch is the same. If not, reject this request with MISMATCH_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually itself would learn about the new epoch anyways.
  2. Check that the epoch on the FetchOffset's  FetchEpoch are consistent with the leader's log. Specifically we check that FetchOffset is less than or equal to the end offset of FetchEpoch. If not, return OUT_OF_RANGE and encode the next FetchOffset as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable.
  3. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.If the `IsFollower` flag is set true by the sender,  but the sender is not inside leader's cached quorum, reply the request with NOT_FOLLOWER error to let it downgrade. This could be caused by the reassignment or the old follower was in a zombie state. Note the fetched data shall still be valid. 

The check in step 2 is similar to the logic that followers use today in Kafka through the OffsetsForLeaderEpoch API. In order to make this check efficient, Kafka maintains a leader-epoch-checkpoint file on disk, the contents of which is cached in memory. After every epoch change is observed, we write the epoch and its start offset to this file and update the cache. This allows us to efficiently check whether the leader's log has diverged from the follower and where the point of divergence is. Note that it may take multiple rounds of FetchQuorumRecords in order to find the first offset that diverges in the worst case.

...