This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP will go over scenarios where we might expect disruptive servers and discuss how Pre-Vote (as originally detailed in the Raft paper and in KIP-650) along with Rejecting VoteRequests received within fetch timeout can ensure correctness when it comes to network partitions (as well as quorum reconfiguration and failed disk scenarios).
Pre-Vote is the idea of “canvasing” the cluster to check if it would receive a majority of votes - if yes it increases its epoch and sends a disruptive vote request. If not, it does not increase its epoch and does not send a vote request.
Rejecting VoteRequests received within fetch timeout entails servers rejecting any vote requests received prior to their own fetch timeout expiring. The idea here is if we've recently heard from a leader, we should not attempt to elect a new one just yet.
Disruptive server scenarios
Network Partition
Let's consider two scenarios -
- When a follower becomes partitioned from the rest of the quorum, it will continuously increase its epoch to start elections until it is able to regain connection to the leader/rest of the quorum. When the follower regains connectivity, it will disturb the rest of the quorum as they will be forced to participate in an unnecessary election. While this situation only results in one leader stepping down, as we start supporting larger quorums these events may occur more frequently per quorum.
- When a leader becomes partitioned from the rest of the quorum, one of the following occurs
- It becomes aware that it should step down as a leader/initiates a new election.
This is now the follower case covered above. - It continues to believe it is the leader until rejoining the rest of the quorum.
On rejoining the rest of the quorum, it will start receiving messages w/ a different epoch. If higher than its own, the node will … if lower than its own, the node will … (todo: finish and confirm)
- It becomes aware that it should step down as a leader/initiates a new election.
While network partitions may be the main issue we expect to encounter/mitigate impact for, it’s possible that bugs and user error create similar effects to network partitions that we need to guard against. For instance, a bug which causes fetch requests to periodically timeout or setting controller.quorum.fetch.timeout.ms
and other related configs too low.
Quorum reconfiguration
The scenarios here should be covered by KIP-853: KRaft Controller Membership Changes, which may choose to make use of Pre-Vote. While KIP-853 may take another approach to handle the following scenarios, it's worth discussing how Pre-Vote can be applied as well.
Servers in old configuration
When reconfiguring a quorum, servers in the old configuration which are not also in new configuration will stop receiving heartbeats from the leader which can lead to them starting new elections and forcing the current leader to step down. This is disruptive as the servers in the old configuration are not eligible to be elected and could cause leadership to bounce prior to their complete removal.
One way to handle this is proposed by KIP-595 - having leaders reject fetch requests from servers in old configurations and letting the servers know this way that they should shutdown now.
Another way is to reject election requests sent by servers in old configurations due for removal - with Pre-Vote implemented this would not result in any epoch bumps. This could increase the chance of unavailability if the old server is the only one eligible for leadership. To safeguard against this we could have servers only reject election requests received before their fetch timeout hits zero.
Servers in new configuration
What happens when a new node joins the quorum? If it were allowed to participate in elections without having fully replicated the leader’s log, could a node w/ a subset of the committed data be elected leader? Quorum reconfiguration only allows one addition/deletion at a time, and we are most vulnerable when the original configuration is small (3 node minimum). Given this, if a majority of the cluster is not caught up with the leader when we add a new node, we may lose data.
Ex: Majority is not caught up with leader prior to addition of new node, leading to data loss if a lagging follower receives a majority of votes
Time | Node 1 | Node 2 | Node 3 | Node 4 |
---|---|---|---|---|
T0 | Leader | Lagging follower | Lagging follower | Added to quorum |
T1 | Leader → Unattached state | Follower → Unattached state | Follower → Unattached State | Starts an election before catching up on replication |
T2 | All nodes reject this node's vote request | |||
T3 | Election ms times out and this node starts an election | |||
T4 | Won’t vote for node 2 since node 2’s log is not as up-to-date as its own | Votes for itself | May vote for node 2 if node 2’s log is more up-to-date than theirs | Votes for node 2 |
One way to handle this is to add logic preventing nodes with new disks (determined via a unique storage id) from voting prior to sufficiently catching up on the log.
Another way is to reject pre-vote requests from these nodes.
Disk Loss Scenario
This scenario shares similarities with adding new nodes to the quorum. If a node loses its disk and fails to fully catch up to the leader prior to another node starting an election, it may vote for any node which is at least as caught up as itself (which might be less than the last leader). The two solutions above can be applied here as well.
Time | Node 1 | Node 2 | Node 3 |
---|---|---|---|
T0 | Leader with majority of quorum (node 1, node 3) caught up with its committed data | Lagging follower | Follower |
T1 | Disk failure | ||
T2 | Leader → Unattached state | Follower → Unattached state | Comes back up w/ new disk, triggers an election before catching up on replication |
Will not be elected | |||
T4 | Election ms times out and starts an election | ||
T5 | Votes for Node 2 | Votes for Node 2 | |
T6 | Elected as leader leading to data loss |
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Pre-Vote
We will add a new field PreVote
to VoteRequests to signal whether the request is a PreVote or not. The candidate does not increase its epoch prior to sending the request out. The VoteResponse
schema does not need any additional fields (still needs a version bump to match version bump for VoteRequest
).
{ "apiKey": 52, "type": "request", "listeners": ["controller"], "name": "VoteRequest", "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null"}, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "CandidateEpoch", "type": "int32", "versions": "0+", "about": "The bumped epoch of the candidate sending the request"}, { "name": "CandidateId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The ID of the voter sending the request"}, { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+", "about": "The epoch of the last record written to the metadata log"}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The offset of the last record written to the metadata log"}, { "name": "PreVote", "type": "boolean", "versions": "1+", "about": "Whether the request is a PreVote request (no epoch increase) or not."} ] ... }
Proposed Changes
Pre-Vote
A candidate will now send a VoteRequest
with the PreVote
field set to true
when its election timeout expires. If (majority - 1) of VoteResponse
grant the vote, the candidate will then bump its epoch up and send a VoteRequest
with PreVote
set to false
which behaves the same way as before.
When servers receive VoteRequests with the PreVote
field set to true
, they will respond with VoteGranted
set to
true
if they haven't heard from a leader in fetch.timeout.ms and all conditions that normally need to be met for VoteRequests are satisfiedfalse
if they have heard from a leader in fetch.timeout.ms (could help cover 'Servers in old configuration' scenario) or conditions that normally need to be met for VoteRequests are not satisfied
(Not in scope) To address the disk loss and 'Servers in new configuration' scenario, one option would be to have servers respond false
to vote requests from servers that have a new disk and haven't caught up on replication too.
How does this prevent unnecessary elections when it comes to network partitions?
When a partitioned node rejoins and forces the cluster to participate in an election, all nodes reject the pre-vote request from the disruptive follower since they've recently heard from the active leader. The disruptive node continuously kicks off elections but is unable to be elected. It should rejoin the quorum when it discovers the higher epoch on the next valid election by another node (todo: check this for accuracy, there should be other ways for the node to become follower earlier)
Can this prevent necessary elections?
Yes. If a leader is unable to receive fetch responses from a majority of nodes, it can impede followers that are able to communicate with it from voting in another eligible leader that can communicate with a majority of the cluster. This is the reason why an additional "Check Quorum" safeguard is needed which is what KAFKA-15489 implements. Check Quorum ensures a leader steps down if it is unable to receive fetch responses from a majority of nodes.
Do we still need to reject VoteRequests received within fetch timeout if we have implemented Pre-Vote and Check Quorum?
Yes. Specifically we would be rejecting Pre-Vote requests received within fetch timeout. We need to avoid bumping epochs without a new leader being elected else the node requesting the election(s) will be unable to rejoin the quorum because its epoch is greater than everyone else's while its log continues to fall behind.
The following are two scenarios where just having Pre-Vote is not enough.
- A node in an old configuration (e.g. S1 in the below diagram pg. 41) starts a “pre-vote” when the leader is temporarily unavailable, and is elected because it is as up-to-date as the majority of the quorum. The Raft paper argues we can not rely on the original leader replicating fast enough to get past this scenario, however unlikely that it is. We can imagine some bug/limitation with quorum reconfiguration causes S1 to continuously try to reconnect with the quorum (i.e. start elections) when the leader is trying to remove it from the quorum.
- We can also imagine a non-reconfiguration scenario where two nodes, one of which is the leader, are simply unable to communicate with each other. Since the non-leader node is unable to find a leader, it will start an election and may get elected. Since the prior leader is now unable to find the new leader, it will start an election and may get elected. This could continue in a cycle.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.