Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Remove stale references to FetchQuorumRecords

...

  • INVALID_CLUSTER_ID: The request either included a clusterId which does not match the one expected by the leader or failed to include a clusterId when one was expected.
  • FENCED_LEADER_EPOCH: The leader epoch in the request is smaller than the latest known to the recipient of the request.
  • UNKNOWN_LEADER_EPOCH: The leader epoch in the request is larger than expected. Note that this is an unexpected error. Unlike normal Kafka log replication, it cannot happen that the follower receives the newer epoch before the leader.
  • OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords Fetch API to indicate that the follower has fetched from an invalid offset and should truncate to the offset/epoch indicated in the response.
  • NOT_LEADER_FOR_PARTITION: Used in DescribeQuorum and AlterPartitionReassignments to indicate that the recipient of the request is not the current leader.
  • INVALID_QUORUM_STATE: This error code is reserved for cases when a request conflicts with the local known state. For example, if two separate nodes try to become leader in the same epoch, then it indicates an illegal state change.
  • INCONSISTENT_VOTER_SET: Used when the request contains inconsistent membership.

...

  1. If it fails to receive a FetchQuorumRecordsResponse FetchResponse from the current leader before expiration of quorum.fetch.timeout.ms
  2. If it receives a EndQuorumEpoch request from the current leader
  3. If it fails to receive a majority of votes before expiration of quorum.election.timeout.ms after declaring itself a candidate.

...

Note that only voters receive the BeginQuorumEpoch request: observers will discover the new leader through either the DiscoverBrokers or FetchQuorumRecordsFetch APIs. For example, the old leader would return an error code in FetchQuorumRecords Fetch response indicating that it is no longer the leader and it will also encode the current known leader id / epoch as well, then the observers can start fetching from the new leader. In case the old leader does not know who's the new leader, observers can still fallback to DiscoverBrokers request to discover the new leader.

...

As soon as a broker accepts a BeginQuorumEpoch request, it will transition to a follower state and begin sending FetchQuorumRecords Fetch requests to the new leader.

...

If the response contains no errors, then the leader will record the follower in memory as having endorsed the election. The leader will continue sending BeginQuorumEpoch to each known voter until it has received its endorsement. This ensures that a voter that is partitioned from the network will be able to discover the leader quickly after the partition is restored. An endorsement from an existing voter may also be inferred through a received FetchQuorumRecords Fetch request with the new leader's epoch even if the BeginQuorumEpoch request was never received.

If the error code indicates that voter's known leader epoch is larger (i.e. if the error is  FENCED_LEADER_EPOCH), then the voter will update quorum-state and become a follower of that leader and begin sending FetchQuorumRecords Fetch requests.

EndQuorumEpoch

The EndQuorumEpoch API is used by a leader to gracefully step down so that an election can be held immediately without waiting for the election timeout. The primary use case for this is to enable graceful shutdown. If the shutting down voter is either an active current leader or a candidate if there is an election in progress, then this request will be sent. It is also used when the leader needs to be removed from the quorum following an AlterPartitionReassignments request. 

...

  1. First ensure that the leader epoch is the same. If not, reject this request with either the FENCED_LEADER_EPOCH or UNKNOWN_LEADER_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually the receiver would learn about the new epoch anyways. Actually this case should not happen since, unlike the normal partition replication protocol, leaders are always the first to discover that they have been elected.
  2. Check that the epoch on the FetchOffset's  FetchEpoch are consistent with the leader's log. Specifically we check that FetchOffset is less than or equal to the end offset of FetchEpoch. If not, return OUT_OF_RANGE and encode the next FetchOffset as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords Fetch round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable.
  3. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.

The check in step 2 is similar to the logic that followers use today in Kafka through the OffsetsForLeaderEpoch API. In order to make this check efficient, Kafka maintains a leader-epoch-checkpoint file on disk, the contents of which is cached in memory. After every epoch change is observed, we write the epoch and its start offset to this file and update the cache. This allows us to efficiently check whether the leader's log has diverged from the follower and where the point of divergence is. Note that it may take multiple rounds of FetchQuorumRecords Fetch in order to find the first offset that diverges in the worst case.

...

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive FetchQuorumRecords Fetch requests from a majority of the quorum for that amount of time, it would start sending Metadata request to random nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall reset the connection information and send out DiscoverBrokers. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes an observer; and if it realized that it is still within the current quorum's voter list, it would start fetching from that leader. Note that the node will remain a leader until it finds that it has been supplanted by another voter.

...

On Performance: There are also tradeoffs from a performance perspective between these two models. In order to commit a record, it must be replicated to a majority of nodes. This includes both propagating the record data from the leader to the follower and propagating the successful write of that record from the follower back to the leader. The push model potentially has an edge in latency because it allows for pipelining. The leader can continue sending new records to followers while it is awaiting the committing of previous records. In the proposal here, we do not pipeline because the leader relies on getting the next offset from the FetchQuorumRecords Fetch request. However, this is not a fundamental limitation. If the leader keeps track of the last sent offset, then we could instead let the FetchQuorumRecords Fetch request be pipelined so that it indicates the last acked offset and allows the leader to choose the next offset to send. Basically rather than letting the leader keep sending append requests to the follower as new data arrives,  the follower would instead keep sending fetch requests as long as the acked offset is changing. Our experience with Kafka replication suggests that this is unlikely to be necessary, so we prefer the simplicity of the current approach, which also allows us to reuse more of the existing log layer in Kafka. However, we will evaluate the performance characteristics and make improvements as necessary. Note that although we are specifying the initial versions of the protocols in this KIP, there will almost certainly be additional revisions before this reaches production.

...

  1. Upon starting up, brokers always try to bootstrap its knowledge of the quorum by first reading the quorum-state file and then scanning forward from AppliedOffset to the end of the log to see if there are any changes to the quorum state. For newly started brokers, the log / file would all be empty so no previous knowledge can be restored.
  2. If after step 1), there's some known quorum state along with a leader / epoch already, the broker would:
    1. Promote itself from observer to voter if it finds out that it's a voter for the epoch.
    2. Start sending FetchQuorumRecords Fetch request to the current leader it knows (it may not be the latest epoch's leader actually).
  3. Otherwise, it will try to learn the quorum state by sending DiscoverBrokers to any other brokers inside the cluster via boostrap.servers as the second option of quorum state discovery.
    1. As long as a broker does not know all the current quorum voter's connections, it should continue periodically ask other brokers via DiscoverBrokers.
  4. Send out MetadataRequest to the discovered brokers to find the current metadata partition leader.
    1. As long as a broker does not know the current quorum (including the leader and the voters), it should continue periodically ask other brokersvia Metadata.
  5. If even step 3) 4) cannot find any quorum information – e.g. when there's no other brokers in the cluster, or there's a network partition preventing this broker to talk to others in the cluster – fallback to the third option of quorum state discover by checking if it is among the brokers listed in quorum.voters.
    1. If so, then it will promote to voter state and add its own connection information to the cached quorum state and return that in the DiscoverBrokers responses it answers to other brokers; otherwise stays in observer state.
    2. In either case, it continues to try to send DiscoverBrokers to all other brokers in the cluster via boostrap.servers.
  6. For any voter, after it has learned a majority number of voters in the expected quorum from DiscoverBrokers responses, it will begin a vote.

...

ClusterId generation: Note that the first leader is also responsible for providing the ClusterId field which is part of the VoterAssignmentMessage. If the cluster is being migrated from Zookeeper, then we expect to reuse the existing clusterId. If the cluster is starting for the first time, then a new one will be generated -- in practice it is the user's responsibility to guarantee the newly generated clusterId is unique. Once this message has been committed to the log, the leader and all future leaders will strictly validate that this value matches the ClusterId provided in requests when receiving Vote, BeginEpoch, EndEpoch, and FetchQuorumRecords Fetch requests. 

The leader will also append a LeaderChangeMessage as described in the VoteResponse handling above. This is not needed for correctness. It is just useful for debugging and to ensure that the high watermark always has an opportunity to advance after an election.

...

  1. Append an VoterAssignmentMessage to the log with the current voters as CurrentVoters and the Replicas from the AlterPartitionReassignments request as the TargetVoters.
  2. Leader will compute 3 sets based on CurrentVoters and TargetVoters:
    1. RemovingVoters: voters to be removed
    2. RetainedVoters: voters shared between current and target
    3. NewVoters: voters to be added
  3. Based on comparison between size(NewVoters) and size(RemovingVoters),
    1. If size(NewVoters) >= size(RemovingVoters), pick one of NewVoters as NV by writing a record with CurrentVoters=CurrentVoters + NV, and TargetVoters=TargetVoters.
    2. else pick one of RemovingVoters as RV, preferably a non-leader voter, by writing a record with CurrentVoters=CurrentVoters - RV, and TargetVoters=TargetVoters.
  4. Once the record is committed, the membership change is safe to be applied. Note that followers will begin acting with the new voter information as soon as the log entry has been appended. They do not wait for it to be committed.
  5. As there is a potential delay for propagating the removal message to the removing voter, we piggy-back on the `FetchQuorumRecords` `Fetch` to inform the voter to downgrade immediately after the new membership gets committed. See the error code NOT_FOLLOWER.
  6. The leader will continue this process until one of the following scenarios happens:
    1. If TargetVoters = CurrentVoters, then the reassignment is done. The leader will append a new entry with TargetVoters=null to the log.
    2. If the leader is the last remaining node in RemovingVoters, then it will step down by sending EndQuorumEpoch to the current voters. It will continue as a voter until the next leader removes it from the quorum.

...

Our solution to address this problem is simple. We require the leader to indicate its current dirty offset in each FetchQuorumRecords Fetch response. A follower/observer will know if its current snapshot represents a consistent version if and only if its local log end offset after appending the records from the response is greater than or equal to the dirty offset received from the leader.

...

Additionally, we provide a separate lastCaughtUpTimeMs field in the FetchQuorumRecords Fetch response which can be useful for an observer to detect how well it is keeping up with the replication. If lastCaughtUpTimeMs begins to grow, then an observer may decide to stop serve metadata requests from clients.

...