Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: [One of "Under Discussion", "Accepted", "Rejected"] Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP will go over scenarios where we might expect disruptive servers and discuss how Pre-Vote (as originally detailed in the Raft paper and in KIP-650) along with Rejecting VoteRequests received within fetch timeout can ensure correctness when it comes to network partitions (as well as quorum reconfiguration and failed disk scenarios).  

...

Rejecting VoteRequests received within fetch timeout entails servers rejecting any vote requests received prior to their own fetch timeout expiring. The idea here is if we've recently heard from a leader, we should not attempt to elect a new one just yet.

Disruptive server scenarios

Network Partition

Let's consider two scenarios -

  1. When a follower becomes partitioned from the rest of the quorum, it will continuously increase its epoch to start elections until it is able to regain connection to the leader/rest of the quorum. When the follower regains connectivity, it will disturb the rest of the quorum as they will be forced to participate in an unnecessary election. While this situation only results in one leader stepping down, as we start supporting larger quorums these events may occur more frequently per quorum.
  2. When a leader becomes partitioned from the rest of the quorum, one of the following occurs
    1. It becomes aware that it should step down as a leader/initiates a new election.
      This is now the follower case covered above.
    2. It continues to believe it is the leader until rejoining the rest of the quorum.
      On rejoining the rest of the quorum, it will start receiving messages w/ a different epoch. If higher than its own, the node will … if eventually become a follower of the node with the higher epoch. If lower than its own, the node will … (todo: finish and confirmmay still believe it is leader and send fetch requests to the rest of the quorum. Nodes will transition to Unattached state and election(s) will ensue (todo)

While network partitions may be the main issue we expect to encounter/mitigate impact for, it’s possible that bugs and user error create similar effects to network partitions that we need to guard against. For instance, a bug which causes fetch requests to periodically timeout or setting controller.quorum.fetch.timeout.ms  and other related configs too low.

Quorum reconfiguration

The scenarios here should be covered by KIP-853: KRaft Controller Membership Changes, which may choose to make use of Pre-Vote. While KIP-853 may take another approach to handle the following scenarios, it's worth discussing how Pre-Vote can be applied as well.

Servers in old configuration

When reconfiguring a quorum, servers in the old configuration which are not also in new configuration will stop receiving heartbeats from the leader which can lead to them starting new elections and forcing the current leader to step down. This is disruptive as the servers in the old configuration are not eligible to be elected and could cause leadership to bounce prior to their complete removal.

...

Another way is to reject election requests sent by servers in old configurations due for removal - with Pre-Vote implemented this would not result in any epoch bumps. This could increase the chance of unavailability if the old server is the only one eligible for leadership. To safeguard against this we could have servers only reject election requests received before their fetch timeout hits zero.

Servers in new configuration

What happens when a new node joins the quorum? If it were allowed to participate in elections without having fully replicated the leader’s log, could a node w/ a subset of the committed data be elected leader? Quorum reconfiguration only allows one addition/deletion at a time, and we are most vulnerable when the original configuration is small (3 node minimum). Given this, if a majority of the cluster is not caught up with the leader when we add a new node, we may lose data. 

...

Another way is to reject pre-vote requests from these nodes.

Disk Loss Scenario

This scenario shares similarities with adding new nodes to the quorum. If a node loses its disk and fails to fully catch up to the leader prior to another node starting an election, it may vote for any node which is at least as caught up as itself (which might be less than the last leader). The two solutions above can be applied here as well.

Time

Node 1

Node 2

Node 3

T0

Leader with majority of quorum (node 1, node 3) caught up with its committed data

Lagging follower

Follower

T1



Disk failure

T2

Leader → Unattached state

Follower → Unattached state

Comes back up w/ new disk, triggers an election before catching up on replication




Will not be elected

T4


Election ms times out and starts an election


T5


Votes for Node 2

Votes for Node 2

T6


Elected as leader leading to data loss



Public Interfaces

...

Pre-Vote

We will add a new field PreVote to VoteRequests to signal whether the request is a PreVote or not. The candidate does not increase its epoch prior to sending the request out. The VoteResponse schema does not need any additional fields (still needs a version bump to match version bump for VoteRequest).

Code Block
{
  "apiKey": 52,
  "type": "request",
  "listeners": ["controller"],
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Pre-Vote

We will add a new field PreVote to VoteRequests to signal whether the request is a PreVote or not. The candidate does not increase its epoch prior to sending the request out. The VoteResponse schema does not need any additional fields (still needs a version bump to match version bump for VoteRequest).

Code Block
{
  "apiKey": 52,
  "type": "request",
  "listeners": ["controller"],
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterIdTopics", "type": "string[]TopicData",
      "versions": "0+",
   "fields": [
      { "nullableVersionsname": "0+TopicName", "defaulttype": "nullstring"},
    { "nameversions": "Topics0+", "typeentityType": "[]TopicDatatopicName",
        "versionsabout": "0+", "fields": [The topic name." },
      { "name": "TopicNamePartitions", "type": "string[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "entityType"type": "int32", "versions": "topicName0+",
          "about": "The topicpartition nameindex." },
        { "name": "PartitionsCandidateEpoch", "type": "[]PartitionDataint32", "versions": "0+",
          "versionsabout": "0+", "fields": [The bumped epoch of the candidate sending the request"},
        { "name": "PartitionIndexCandidateId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The partition index." ID of the voter sending the request"},
        { "name": "CandidateEpochLastOffsetEpoch", "type": "int32", "versions": "0+",
          "about": "The bumped epoch of the last record candidatewritten sendingto the metadata requestlog"},
        { "name": "CandidateIdLastOffset", "type": "int32int64", "versions": "0+", "entityType": "brokerId",
          "about": "The IDoffset of the last record voterwritten sendingto the requestmetadata log"},
        { "name": "LastOffsetEpochPreVote", "type": "int32boolean", "versions": "01+",
          "about": "TheWhether the epochrequest ofis thea lastPreVote recordrequest written(no toepoch theincrease) metadataor lognot."},
        { "name": "LastOffset", "type": "int64", "versions": "0+",
          "about": "The offset of the last record written to the metadata log"},
        { "name": "PreVote", "type": "boolean", "versions": "1+",
          "about": "Whether the request is a PreVote request (no epoch increase) or not."}
      ]
 ...
}

Proposed Changes

Pre-Vote

]
 ...
}

Proposed Changes

Pre-Vote

A candidate will now send a VoteRequest with the PreVote field set to true when its election timeout expires. If [majority - 1] of VoteResponse grant the vote, the candidate will then bump its epoch up and send a VoteRequest with PreVote set to false which behaves the same way as before.

When servers receive VoteRequests with the PreVote field set to true, they will respond with VoteGranted set to

  • true if all conditions that normally need to be met for VoteRequests are satisfied
  • false if conditions that normally need to be met for VoteRequests are not satisfied
  • (Not in scope) To address the disk loss and 'Servers in new configuration' scenario, one option would be to have servers respond false to vote requests from servers that have a new disk and haven't caught up on replication 

When a server receives VoteResponses, it will follow it up with a VoteRequest with PreVote set to 

  • false if the server has received [majority - 1] VoteResponses with VoteGranted set to true within [election.timeout.ms + a little randomness]
  • true (another Pre-Vote request) if the server receives [majority] VoteResponse with VoteGranted set to false within [election.timeout.ms + a little randomness]
  • true if the server receives less than [majority] VoteResponse with VoteGranted set to false within [election.timeout.ms + a little randomness] and the first bullet point does not apply
    • A counter argument for doing this might be that it's safer to start sending non-Pre-Vote requests when we encounter unideal situations (e.g. unable to communicate with 1 server in a 3 node quorum) so we know we fallback to the original protocol. We can do this, but it should be safe for a server to continually send Pre-Vote requests if it does not receive a majority of VoteGranted responses - either the leader is able to communicate with the entire cluster or another server can start an election. If the cluster is truly partitioned and no server is able to communicate with a majority of the cluster, there is no point in servers sending out non-Pre-Vote requests.

If a server happens to receive multiple VoteResponses from another server for a particular VoteRequest, it can take the first and ignore the rest. We could also choose to take the last, but taking the first is simpler. A server does not need to worry about persisting its election state for a Pre-Vote response like we currently do for VoteResponses because the direct result of the Pre-Vote phase does not elect leaders. 

How does this prevent unnecessary elections when it comes to network partitions?

When a partitioned node rejoins and forces the cluster to participate in an election, all nodes reject the pre-vote request from the disruptive follower since they've recently heard from the active leader. The disruptive node continuously kicks off elections but is unable to be elected. It should rejoin the quorum when it discovers the higher epoch on the next valid election by another node (todo: check this for accuracy, there should be other ways for the node to become follower earlier)

Can this prevent necessary elections?

Yes. If a leader is unable to receive fetch responses from a majority of nodes, it can impede followers that are able to communicate with it from voting in an eligible leader that can communicate with a majority of the cluster. This is the reason why an additional "Check Quorum" safeguard is needed which is what KAFKA-15489 implements. Check Quorum ensures a leader steps down if it is unable to receive fetch responses from a majority of nodes.

Do we still need to reject VoteRequests received within fetch timeout if we have implemented Pre-Vote and Check Quorum?

Yes. Specifically we would be rejecting Pre-Vote requests received within fetch timeout. We need to avoid bumping epochs without a new leader being elected else the node requesting the election(s) will be unable to rejoin the quorum because its epoch is greater than everyone else's while its log continues to fall behind.

The following are two scenarios where having just Pre-Vote and Check Quorum is not enough.

  • Scenario A: A node in an old configuration (e.g. S1 in the below diagram pg. 41) starts a “pre-vote” when the leader is temporarily unavailable, and is elected because it is as up-to-date as the majority of the quorum. The Raft paper argues we can not rely on the original leader replicating fast enough to get past this scenario, however unlikely that it is. We can imagine some bug/limitation with quorum reconfiguration causes S1 to continuously try to reconnect with the quorum (i.e. start elections) when the leader is trying to remove it from the quorum.

Image Added

  • Scenario B: We can also imagine a non-reconfiguration scenario where two nodes, one of which is the leader, are simply unable to communicate with each other. Since the non-leader node is unable to find a leader, it will start an election and may get elected. Since the prior leader is now unable to find the new leader, it will start an election and may get elected. This could continue in a cycle.

Rejecting Pre-Vote requests received within fetch timeout

As mentioned in the prior section, a server should reject Pre-Vote requests received from other servers if its own fetch timeout has not expired yet. The logic now looks like the following for servers receiving VoteRequests with PreVote  set to true A candidate will now send a VoteRequest with the PreVote field set to true when its election timeout expires. If (majority - 1) of VoteResponse grant the vote, the candidate will then bump its epoch up and send a VoteRequest with PreVote set to false which behaves the same way as before.

When servers receive VoteRequests with the PreVote field set to true, they will respond with VoteGranted set to

  • true if they haven't heard from a leader in fetch.timeout.ms and all conditions that normally need to be met for VoteRequests are satisfied
  • falseif they have heard from a leader in fetch.timeout.ms (could help cover

...

  • Scenario A & B from above) or conditions that normally need to be met for VoteRequests are not satisfied
  • (Not in scope) To address the disk loss and 'Servers in new configuration' scenario

...

  • , one option would be to have servers respond false to vote requests from servers that have a new disk and haven't caught up on replication 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

This will be tested with unit tests, integration tests, system tests, and TLA+. (todo)

Rejected Alternatives

Rejecting VoteRequests received within fetch timeout (w/o Pre-Vote)

This was originally proposed in the Raft paper as a necessary safeguard to prevent Scenario A from occurring, but we can see how this could extend to cover all the other disruptive scenarios mentioned.

  • When a partitioned node rejoins and forces the cluster to participate in an election …

    • if a majority of the cluster is not receiving fetch responses from the current leader, they consider the vote request and make the appropriate state transitions. An election would be needed in this case anyways.

    • if the rest of the cluster is still receiving fetch responses from the current leader, they reject the vote request from the disruptive follower. No one transitions to a new state (e.g. Unattached) as a result of the vote request, current leader is not disrupted.

  • For a node in an old configuration (that’s not in the new configuration) …

    • if the current leader is still responding to fetch requests in a reasonable amount of time, the node is prevented from starting and winning elections, which would delay reconfiguration.

    • if the current leader is not responding to fetch requests, then the node could still win an election (this scenario calls for an election anyways). KIP-853 should cover preventing this case if necessary.

  • For a node w/ new disk/data loss …

    • if the current leader is still responding to fetch requests in a reasonable amount of time, the node is prevented from starting and winning elections, which could lead to loss of committed data.

    • if the current leader is not responding to fetch requests, we can reject VoteRequests from the node w/ new disk/data loss if it isn't sufficiently caught up on replication. KIP-853 should cover this case w/ storage ids if necessary.

However, this would not be a good standalone alternative to Pre-Vote because once a server starts a disruptive election (disruptive in the sense that the current leader still has majority), its epoch may increase while none of the other servers' epochs do. The most likely way for the server to rejoin the quorum now with its inflated epoch would be to win an election. Since epochs are not increased with Pre-Vote requests, it is easier for a disruptive server to rejoin the quorum once it finds any of the servers in the cluster have been elected. (todo: can it join once it finds the current leader)

Separate RPC for Pre-Vote

This would be added toil with no real added benefits. Since a Pre-Vote and a standard Vote are similar in concept, it makes sense to cover both with the same RPC. We can add clear logging and metrics to easily differentiate between Pre-Vote and standard Vote requests

...

How does this prevent unnecessary elections when it comes to network partitions?

When a partitioned node rejoins and forces the cluster to participate in an election, all nodes reject the pre-vote request from the disruptive follower since they've recently heard from the active leader. The disruptive node continuously kicks off elections but is unable to be elected. It should rejoin the quorum when it discovers the higher epoch on the next valid election by another node (todo: check this for accuracy, there should be other ways for the node to become follower earlier)

Can this prevent necessary elections?

Yes. If a leader is unable to receive fetch responses from a majority of nodes, it can impede followers that are able to communicate with it from voting in another eligible leader that can communicate with a majority of the cluster. This is the reason why an additional "Check Quorum" safeguard is needed which is what KAFKA-15489 implements. Check Quorum ensures a leader steps down if it is unable to receive fetch responses from a majority of nodes.

Do we still need to reject VoteRequests received within fetch timeout if we have implemented Pre-Vote and Check Quorum?

Yes. Specifically we would be rejecting Pre-Vote requests received within fetch timeout. We need to avoid bumping epochs without a new leader being elected else the node requesting the election(s) will be unable to rejoin the quorum because its epoch is greater than everyone else's while its log continues to fall behind.

The following are two scenarios where just having Pre-Vote is not enough.

  • A node in an old configuration (e.g. S1 in the below diagram pg. 41) starts a “pre-vote” when the leader is temporarily unavailable, and is elected because it is as up-to-date as the majority of the quorum. The Raft paper argues we can not rely on the original leader replicating fast enough to get past this scenario, however unlikely that it is. We can imagine some bug/limitation with quorum reconfiguration causes S1 to continuously try to reconnect with the quorum (i.e. start elections) when the leader is trying to remove it from the quorum.

Image Removed

  • We can also imagine a non-reconfiguration scenario where two nodes, one of which is the leader, are simply unable to communicate with each other. Since the non-leader node is unable to find a leader, it will start an election and may get elected. Since the prior leader is now unable to find the new leader, it will start an election and may get elected. This could continue in a cycle.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.