Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The core requests have a field for the clusterId. This is validated when a request is received to ensure that misconfigured brokers will not cause any harm to the cluster.
  2. All requests save Vote have a field for the leader epoch. Voters and leaders are always expected to ensure that the request epoch is consistent with its own and return an error if it is not.
  3. We piggyback current leader and epoch information on all responses. This reduces latency to discover a leader change.
  4. Although this KIP only plans for a single-raft quorum implementation, we see it is beneficial to keep the door open for multi-raft architecture in the long term, for use cases such as sharded controller or general quorum based topic replication. So all the core RPCs will be designed as "batch" API accommodate this need from the beginning, but only be used in a single Raft quorum of the metadata topic.

Note that this protocol is only concerned with leader election and log replication. It does not specify what log entries are appended to the leader's log nor how they will be received by the leader. Typically this would be through specific management APIs. For example, KIP-497 adds an AlterISR API. When the leader of the metadata quorum (i.e. the controller) receives an AlterISR request, it will append an entry to its log.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "VoteRequest",
  "validVersions": "0",
  "flexibleVersion": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      { "name": "CandidateEpochTopics", "type": "int32[]VoteTopicRequest", 
        "versions": "0+",
       "aboutfields": "The[
 bumped epoch of the candidate sending the request"},
    {  {"name": "CandidateIdTopicName", "type": "int32string", "versions": "0+"2+", "entityType": "topicName",
            "about": "The IDtopic of the voter sending the request"},
name." },
          { "name": "LastEpochPartitions", "type": "int32[]VotePartitionRequest", 
            "versions": "0+",
       "about"fields": "The[
 epoch of the last record written to the metadata log"},
      { "name": "LastEpochEndOffsetPartitionIndex", "type": "int64int32", "versions": "0+",
       "about": "The offset of the last record written to the metadata log"}
  ]
}

Response Schema

Code Block
{
  "apiKeyabout": N,
"The partition index."type": "response" },
  "name": "VoteResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCodeCandidateEpoch", "type": "int16int32", "versions": "0+"},
              "about": "The bumped epoch of the candidate sending the request"},
              {"name": "LeaderIdCandidateId", "type": "int32", "versions": "0+",
              "about": "The ID of the voter sending the currentrequest"},
 leader or -1 if the leader is unknown."},
      {"name": "LeaderEpochLastEpoch", "type": "int32", "versions": "0+",
              "about": "The latest known leader epoch epoch of the last record written to the metadata log"},
              {"name": "VoteGrantedLastEpochEndOffset", "type": "boolint64", "versions": "0+",
              "about": "TrueThe offset ifof the votelast record waswritten grantedto andthe falsemetadata otherwiselog"}
    ]
}

Vote Request Handling

Note we need to respect the extra condition on leadership: the candidate’s log is at least as up-to-date as any other log in the majority who vote for it. Raft determines which of two logs is more "up-to-date" by comparing the offset and epoch of the last entries in the logs. If the logs' last entry have different epochs, then the log with the later epoch is more up-to-date. If the logs end with the same epoch, then whichever log is longer is more up-to-date. The Vote request includes information about the candidate’s log, and the voter denies its vote if its own log is more up-to-date than that of the candidate.

When a voter decides to become a candidate and ask for others to vote for it, it will increment its current leader epoch as CandidateEpoch.

When a voter handles a Vote request:

  1. First it checks whether an epoch larger than the candidate epoch from the request is known. If so, the vote is rejected.
  2. It checks if it has voted for that candidate epoch already. If it has, then only grant the vote if the candidate id matches the id that was already voted. Otherwise, the vote is rejected.
  3. If the candidate epoch is larger than the currently known epoch:
    1. Check whether CandidateId is one of the expected voters. If not, then reject the vote. The candidate in this case may have been part of an incomplete AlterQuorum change, so the voter should accept the epoch bump and itself begin a new election.
    2. Check that the candidate's log is at least as up-to-date as it (see above for the comparison rules). If yes, then grant that vote by first updating the quorum-state file, and then returning the response with voteGranted to yes; otherwise rejects that request with the response.

Also note that a candidate always votes for itself at the current candidate epoch. That means, it will also need to update the quorum-state file as "voting for myself" before sending out the vote requests. On the other hand, if it receives a Vote request with a larger candidate epoch, it can still grants that vote while at the same time transiting back to voter state because a newer leader may has been elected for a newer epoch.

Vote Response Handling

When receiving a Vote response:

  1. First we need to check if the voter is still a candidate because it may already observed an election with a larger epoch; if it is no longer in "candidate" state, just ignore the response.
  2. Otherwise, check if it has accumulated majority of votes for this epoch – this information does not need to be persisted, since upon failover it can just resend the vote request and the voters would just grant that request again as long as there's no newer epoch candidate – and if yes, it can transit to the leader state by updating the quorum-state file indicating itself as the leader for the new epoch. Otherwise, just record this vote in memory and do nothing.
  3. Upon becoming the leader, it would also start by writing the current assignment state to its log so that its LastEpoch and LastEpochOffset are updated, and then after that it can start sending out BeginQuorumEpoch requests. 

Writing the dummy entry with the new epoch is not necessary for correctness, but as an optimization to reduce the client request latency. Basically it allows for faster advancement of the high watermark following a leader election (the need for this is discussed in more detail below in the handling of FetchQuorumRecords). The dummy record will be a control record with Type=3. Below we define the schema: 

        ]
          }
      }
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "VoteResponse",
  "validVersions": "0",
  "fields": [
     {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "Topics", "type": "[]VoteTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]VotePartitionResponse", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "ErrorCode", "type": "int16", "versions": "0+"},
            
Code Block
{
  "type": "data",
  "name": "LeaderChangeMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
	  {"name": "LeaderId", "type": "int32", "versions": "0+",
              "about": "The ID of the newly elected leader current leader or -1 if the leader is unknown."},
              {"name": "VotedIdsLeaderEpoch", "type": "[]int32", "versions": "0+",
              "about": "The IDs of the voters who voted for the current leader"},

  ]
}

Also note that unlike other Kafka topic partition data whose log appends are persisted asynchronously, for this special quorum topic all log appends must be synced to FS before returning.

Note on Gridlocked Elections:  It is possible that an election fails because each voter votes for itself. Or, with an even number of voters active, it is possible that a vote ends up split. Generally if a voter fails to get a majority of votes before quorum.election.timeout.ms, then the vote is deemed to have failed, which will cause the candidate to bump the epoch, step down, and backoff according to quorum.election.jitter.max.ms before retrying. Under some situations, a candidate can immediately detect when a vote has failed. For example, if there are only two voters and a candidate fails to get a vote from the other voter (i.e. VoteGranted is returned as false in the VoteResponse), then there is no need to wait for the election timeout. The candidate in this case can immediately step down and backoff.

BeginQuorumEpoch

In traditional Raft, the leader will send an empty Append request to the other nodes in the quorum in order to assert its term. As a pull-based protocol, we need a separate BeginQuorumEpoch API to do the same in order to ensure election results are discovered quickly. In this protocol, once a leader has received enough votes, it will send the BeginQuorumEpoch request to all voters in the quorum.

Note that only voters receive the BeginQuorumEpoch request: observers will discover the new leader through either the FindQuorum or FetchQuorumRecords APIs. For example, the old leader would return an error code in FetchQuorumRecords response indicating that it is no longer the leader and it will also encode the current known leader id / epoch as well, then the observers can start fetching from the new leader. In case the old leader does not know who's the new leader, observers can still fallback to FindQuorum request to discover the new leader.

Request Schema

 latest known leader epoch"},
              {"name": "VoteGranted", "type": "bool", "versions": "0+"
              "about": "True if the vote was granted and false otherwise"}
            ]
          }
      }        
  ]
}

Vote Request Handling

Note we need to respect the extra condition on leadership: the candidate’s log is at least as up-to-date as any other log in the majority who vote for it. Raft determines which of two logs is more "up-to-date" by comparing the offset and epoch of the last entries in the logs. If the logs' last entry have different epochs, then the log with the later epoch is more up-to-date. If the logs end with the same epoch, then whichever log is longer is more up-to-date. The Vote request includes information about the candidate’s log, and the voter denies its vote if its own log is more up-to-date than that of the candidate.

When a voter decides to become a candidate and ask for others to vote for it, it will increment its current leader epoch as CandidateEpoch.

When a voter handles a Vote request:

  1. First it checks whether an epoch larger than the candidate epoch from the request is known. If so, the vote is rejected.
  2. It checks if it has voted for that candidate epoch already. If it has, then only grant the vote if the candidate id matches the id that was already voted. Otherwise, the vote is rejected.
  3. If the candidate epoch is larger than the currently known epoch:
    1. Check whether CandidateId is one of the expected voters. If not, then reject the vote. The candidate in this case may have been part of an incomplete AlterQuorum change, so the voter should accept the epoch bump and itself begin a new election.
    2. Check that the candidate's log is at least as up-to-date as it (see above for the comparison rules). If yes, then grant that vote by first updating the quorum-state file, and then returning the response with voteGranted to yes; otherwise rejects that request with the response.

Also note that a candidate always votes for itself at the current candidate epoch. That means, it will also need to update the quorum-state file as "voting for myself" before sending out the vote requests. On the other hand, if it receives a Vote request with a larger candidate epoch, it can still grants that vote while at the same time transiting back to voter state because a newer leader may has been elected for a newer epoch.

Vote Response Handling

When receiving a Vote response:

  1. First we need to check if the voter is still a candidate because it may already observed an election with a larger epoch; if it is no longer in "candidate" state, just ignore the response.
  2. Otherwise, check if it has accumulated majority of votes for this epoch – this information does not need to be persisted, since upon failover it can just resend the vote request and the voters would just grant that request again as long as there's no newer epoch candidate – and if yes, it can transit to the leader state by updating the quorum-state file indicating itself as the leader for the new epoch. Otherwise, just record this vote in memory and do nothing.
  3. Upon becoming the leader, it would also start by writing the current assignment state to its log so that its LastEpoch and LastEpochOffset are updated, and then after that it can start sending out BeginQuorumEpoch requests. 

Writing the dummy entry with the new epoch is not necessary for correctness, but as an optimization to reduce the client request latency. Basically it allows for faster advancement of the high watermark following a leader election (the need for this is discussed in more detail below in the handling of FetchQuorumRecords). The dummy record will be a control record with Type=3. Below we define the schema: 

Code Block
{
  "type": "data",
  "name": "LeaderChangeMessage",
  "validVersions": "0",
  "flexibleVersions
Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the newly elected leader"},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the newly elected leader"}
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "BeginQuorumEpochResponse",
  "validVersions"fields": [
	  {"name": "LeaderId", "type": "int32", "versions": "0+",
  "fields": [
      {"nameabout": "ErrorCode", "type": "int16", "versions": "0+The ID of the newly elected leader"},
      {"name": "LeaderIdVotedIds", "type": "[]int32", "versions": "0+",
       "about": "The IDIDs of the currentvoters leaderwho orvoted -1for ifthe thecurrent leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
  ]
}

BeginQuorumEpoch Request Handling

A voter will accept a BeginQuorumEpoch if its leader epoch is greater than or equal to the current known epoch so long as it doesn't conflict with previous knowledge. For example, if the leaderId for epoch 5 was known to be A, then a voter will reject a BeginQuorumEpoch request from a separate voter B from the same epoch. If the epoch is less than the known epoch, the request is rejected. 

As soon as a broker accepts a BeginQuorumEpoch request, it will transition to a follower state and begin sending FetchQuorumRecords requests to the new leader.

BeginQuorumEpoch Response Handling

If the response contains no errors, then the leader will record the follower in memory as having endorsed the election. The leader will continue sending BeginQuorumEpoch to each known voter until it has received its endorsement. This ensures that a voter that is partitioned from the network will be able to discover the leader quickly after the partition is restored. An endorsement from an existing voter may also be inferred through a received FetchQuorumRecords request with the new leader's epoch even if the BeginQuorumEpoch request was never received.

If the error code indicates that voter's known leader epoch is larger (i.e. if the error is  FENCED_LEADER_EPOCH), then the voter will update quorum-state and become a follower of that leader and begin sending FetchQuorumRecords requests.

EndQuorumEpoch

The EndQuorumEpoch API is used by a leader to gracefully step down so that an election can be held immediately without waiting for the election timeout. The primary use case for this is to enable graceful shutdown. If the shutting down voter is either an active current leader or a candidate if there is an election in progress, then this request will be sent. It is also used when the leader needs to be removed from the quorum following an AlterQuorum request. 

The EndQuorumEpochRequest will be sent to all voters in the quorum. Inside each request, leader will define the list of preferred successors sorted by each voter's current replicated offset in descending order. Based on the priority of the preferred successors, each voter will choose the corresponding delayed election time so that the most up-to-date voter has a higher chance to be elected. If the node's priority is highest, it will become candidate immediately instead of waiting for next poll. For a successor with priority N > 0, the next election timeout will be computed as:

Code Block
MIN(retryBackOffMaxMs, retryBackoffMs * 2^(N - 1))

where the retryBackOffMaxMs is currently hard coded as one second.

Request Schema


  ]
}

Also note that unlike other Kafka topic partition data whose log appends are persisted asynchronously, for this special quorum topic all log appends must be synced to FS before returning.

Note on Gridlocked Elections:  It is possible that an election fails because each voter votes for itself. Or, with an even number of voters active, it is possible that a vote ends up split. Generally if a voter fails to get a majority of votes before quorum.election.timeout.ms, then the vote is deemed to have failed, which will cause the candidate to bump the epoch, step down, and backoff according to quorum.election.jitter.max.ms before retrying. Under some situations, a candidate can immediately detect when a vote has failed. For example, if there are only two voters and a candidate fails to get a vote from the other voter (i.e. VoteGranted is returned as false in the VoteResponse), then there is no need to wait for the election timeout. The candidate in this case can immediately step down and backoff.

BeginQuorumEpoch

In traditional Raft, the leader will send an empty Append request to the other nodes in the quorum in order to assert its term. As a pull-based protocol, we need a separate BeginQuorumEpoch API to do the same in order to ensure election results are discovered quickly. In this protocol, once a leader has received enough votes, it will send the BeginQuorumEpoch request to all voters in the quorum.

Note that only voters receive the BeginQuorumEpoch request: observers will discover the new leader through either the FindQuorum or FetchQuorumRecords APIs. For example, the old leader would return an error code in FetchQuorumRecords response indicating that it is no longer the leader and it will also encode the current known leader id / epoch as well, then the observers can start fetching from the new leader. In case the old leader does not know who's the new leader, observers can still fallback to FindQuorum request to discover the new leader.

Request Schema

Code Block
{
  "apiKey": N,
  "
Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "EndQuorumEpochRequestBeginQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      { "name": "ReplicaIdTopics", "type": "int32[]BeginQuorumTopicRequest", 
        "versions": "0+",
     "aboutfields": "The[
 ID of the replica sending this request"},
    { "name": "LeaderIdTopicName", "type": "int32string", "versions": "0+"2+", "entityType": "topicName",
            "about": "The currenttopic leader ID or -1 if there is a vote in progress"},
    {name." },
          { "name": "LeaderEpochPartitions", "type": "int32[]BeginQuorumPartitionRequest", 
            "versions": "0+",
   "fields": [
    "about": "The current epoch"},
          { "name": "PreferredSuccessorsPartitionIndex", "type": "[]int32", "versions": "0+",
      "about": "A sorted list of preferred successors to start the election"}
  ]
}

Note that LeaderId and ReplicaId will be the same if the leader has been voted. If the replica is a candidate in a current election, then LeaderId will be -1.

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "EndQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
"about": "The partition index." },
              {"name": "ErrorCodeLeaderId", "type": "int16int32", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
          "about": "The ID of the currentnewly elected leader"},
 or  -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
                "about": "The latest known leader epoch"epoch of the newly elected leader"}
            ]
          }
      }
  ]
}

EndQuorumEpoch Request Handling

Upon receiving the EndQuorumEpoch, the voter checks if the epoch from the request is greater than or equal to its last known epoch. If the epoch is smaller than the last known epoch, or the leader id is not known for this epoch, the request is rejected. Then the voter will check whether it is inside the given preferred successors. If not, return INCONSISTENT_VOTER_SET. If both validation pass, the voter can transit to candidate state immediately if it is first at the list. Otherwise it will wait for a computed back-off timeout to start election as stated in previous section. Before beginning to collect voters, the voter must update the quorum-state file. 

EndQuorumEpoch Response Handling

If there's no error code, then do nothing. Otherwise if the error code indicates there's already higher epoch leader already ("hey you're old news now so I don't care if you're stepping down or what"), then updating its quorum-state file while transiting to follower state. The leader treats this as a best-effort graceful shutdown. If voters cannot be reached to send EndQuorumEpoch, the leader will shutdown without retrying. In the worst case, if none of the EndQuorumEpoch requests are received, the election timeout will eventually trigger a new election.

We shall reuse the same FENCED_LEADER_EPCOH error code to indicate that there is a larger leader epoch. Depending on whether the sender is shutting down or not, it would either ignore the response error or become a follower of the leader indicated in the response.

FetchQuorumRecords

The fetch request is sent by both voters and observers to the current leader in order to replicate log changes. For voters this also serves as a liveness check of the leader.

Log reconciliation: The Raft protocol does not guarantee that the replica with the largest offset is always elected. This means that following a leader election, a follower may need to truncate some of the uncommitted entries from its log. The Kafka log replication protocol has a similar problem following leader election and it is resolved with a separate truncating state which is entered after every leader change. In the truncating state, followers will use the OffsetsForLeaderEpoch API to find the diverging offset between its log and the leader's. Once that is found, the log is truncated and replication continue.

Rather than following the Kafka approach, this protocol piggybacks log reconciliation on the FetchQuorumRecords API, which is more akin to Raft replication. When voters or leaders send a FetchQuorumRecords request, in addition to including an offset to fetch from, they also indicate the epoch of the last offset that they have in their local log (we refer to this as the "fetch epoch"). The leader always checks whether the fetch offset and fetch epoch are consistent with its own log. If they do not match, the fetch response will indicate the largest epoch and its end offset before the requested epoch. It is the responsibility of followers to detect leader changes and use FetchQuorumRecords responses to truncate the log.

The advantage of this approach is that it simplifies the follower state machine. Basically it just needs to sends fetches to the current expected leader with the fetch offset. The response may indicate either a new leader to fetch from or a new fetch offset. It is also safer because the leader is able to validate the fetch offset on every request rather than relying on the follower to detect the epoch changes.

Extra condition on commitment: The Raft protocol requires the leader to only commit entries from any previous epoch if the same leader has already successfully replicated an entry from the current epoch. Kafka's ISR replication protocol suffers from a similar problem and handles it by not advancing the high watermark until the leader is able to write a message with its own epoch. The diagram below taken from the Raft dissertation illustrates the scenario:

Image Removed

The problem concerns the conditions for commitment and leader election. In this diagram, S1 is the initial leader and writes "2," but fails to commit it to all replicas. The leadership moves to S5 which writes "3," but also fails to commit it. Leadership then returns to S1 which proceeds to attempt to commit "2." Although "2" is successfully written to a majority of the nodes, the risk is that S5 could still become leader, which would lead to the truncation of "2" even though it is present on a majority of nodes.

In Kafka's replication protocol, we address this issue by not advancing the high watermark after an election until an entry has been written by the new leader with its epoch. So we would allow the election to S5, which had written "3" even though a majority of nodes had written "2." This is is allowable because "2" was not considered committed. Therefore, as long as we do not respond to client's request for the entries it added until the high watermark is advanced beyond it then this extra condition is satisfied.

This proposal follow's Kafka's replication protocol. The high watermark for the quorum is not advanced until a record from the current epoch has been written. This is the purpose of the "dummy" leader change message that was mentioned in the section on Vote handling. Without this new message, it could be some time before a new entry is written, which could prevent recent appends from being committed.

Request Schema

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "BeginQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "Topics", "type": "[]BeginQuorumTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]BeginQuorumPartitionResponse", 
            "versions": "0+", "fields": [
            {"name": "ErrorCode", "type": "int16", "versions": "0+"},
            {"name": "LeaderId", "type": "int32", "versions": "0+",
             "about": "The ID of the current leader or -1 if the leader is unknown."},
            {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
             "about": "The latest known leader epoch"}
            ]
          }
      }
  ]
}

BeginQuorumEpoch Request Handling

A voter will accept a BeginQuorumEpoch if its leader epoch is greater than or equal to the current known epoch so long as it doesn't conflict with previous knowledge. For example, if the leaderId for epoch 5 was known to be A, then a voter will reject a BeginQuorumEpoch request from a separate voter B from the same epoch. If the epoch is less than the known epoch, the request is rejected. 

As soon as a broker accepts a BeginQuorumEpoch request, it will transition to a follower state and begin sending FetchQuorumRecords requests to the new leader.

BeginQuorumEpoch Response Handling

If the response contains no errors, then the leader will record the follower in memory as having endorsed the election. The leader will continue sending BeginQuorumEpoch to each known voter until it has received its endorsement. This ensures that a voter that is partitioned from the network will be able to discover the leader quickly after the partition is restored. An endorsement from an existing voter may also be inferred through a received FetchQuorumRecords request with the new leader's epoch even if the BeginQuorumEpoch request was never received.

If the error code indicates that voter's known leader epoch is larger (i.e. if the error is  FENCED_LEADER_EPOCH), then the voter will update quorum-state and become a follower of that leader and begin sending FetchQuorumRecords requests.

EndQuorumEpoch

The EndQuorumEpoch API is used by a leader to gracefully step down so that an election can be held immediately without waiting for the election timeout. The primary use case for this is to enable graceful shutdown. If the shutting down voter is either an active current leader or a candidate if there is an election in progress, then this request will be sent. It is also used when the leader needs to be removed from the quorum following an AlterQuorum request. 

The EndQuorumEpochRequest will be sent to all voters in the quorum. Inside each request, leader will define the list of preferred successors sorted by each voter's current replicated offset in descending order. Based on the priority of the preferred successors, each voter will choose the corresponding delayed election time so that the most up-to-date voter has a higher chance to be elected. If the node's priority is highest, it will become candidate immediately instead of waiting for next poll. For a successor with priority N > 0, the next election timeout will be computed as:

Code Block
MIN(retryBackOffMaxMs, retryBackoffMs * 2^(N - 1))

where the retryBackOffMaxMs is currently hard coded as one second.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "EndQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
	{"name": "ClusterId", "type": "string", "versions": "0+"},
    { "name": "Topics", "type": "[]EndQuorumTopicRequest", 
      "versions": "0+", "fields": [
        { "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]EndQuorumPartitionRequest", 
          "versions": "0+", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },    
            {"name": "ReplicaId", "type": "int32", "versions": "0+",
            "about": "The ID of the replica sending this request"},
            {"name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The current leader ID or -1 if there is a vote in progress"},
            {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The current epoch"},
			 {"name": "PreferredSuccessors", "type": "[]int32", "versions": "0+",
		      "about": "A sorted list of preferred successors to start the election"}
          ]
       }
    }      
  ]
}

Note that LeaderId and ReplicaId will be the same if the leader has been voted. If the replica is a candidate in a current election, then LeaderId will be -1.

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "EndQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "Topics", "type": "[]EndQuorumTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]EndQuorumPartitionResponse", 
            "versions": "0+", "fields": [
            {"name": "ErrorCode", "type": "int16", "versions": "0+"},
            {"name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
            {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
            ]
          }
      }          
  ]
}

EndQuorumEpoch Request Handling

Upon receiving the EndQuorumEpoch, the voter checks if the epoch from the request is greater than or equal to its last known epoch. If the epoch is smaller than the last known epoch, or the leader id is not known for this epoch, the request is rejected. Then the voter will check whether it is inside the given preferred successors. If not, return INCONSISTENT_VOTER_SET. If both validation pass, the voter can transit to candidate state immediately if it is first at the list. Otherwise it will wait for a computed back-off timeout to start election as stated in previous section. Before beginning to collect voters, the voter must update the quorum-state file. 

EndQuorumEpoch Response Handling

If there's no error code, then do nothing. Otherwise if the error code indicates there's already higher epoch leader already ("hey you're old news now so I don't care if you're stepping down or what"), then updating its quorum-state file while transiting to follower state. The leader treats this as a best-effort graceful shutdown. If voters cannot be reached to send EndQuorumEpoch, the leader will shutdown without retrying. In the worst case, if none of the EndQuorumEpoch requests are received, the election timeout will eventually trigger a new election.

We shall reuse the same FENCED_LEADER_EPCOH error code to indicate that there is a larger leader epoch. Depending on whether the sender is shutting down or not, it would either ignore the response error or become a follower of the leader indicated in the response.

FetchQuorumRecords (Piggy-back Fetch Protocol)

The fetch request is sent by both voters and observers to the current leader in order to replicate log changes. For voters this also serves as a liveness check of the leader. 

Log reconciliation: The Raft protocol does not guarantee that the replica with the largest offset is always elected. This means that following a leader election, a follower may need to truncate some of the uncommitted entries from its log. The Kafka log replication protocol has a similar problem following leader election and it is resolved with a separate truncating state which is entered after every leader change. In the truncating state, followers will use the OffsetsForLeaderEpoch API to find the diverging offset between its log and the leader's. Once that is found, the log is truncated and replication continue.

Rather than following the Kafka approach, this protocol piggybacks log reconciliation on the FetchQuorumRecords API, which is more akin to Raft replication. When voters or leaders send a FetchQuorumRecords request, in addition to including an offset to fetch from, they also indicate the epoch of the last offset that they have in their local log (we refer to this as the "fetch epoch"). The leader always checks whether the fetch offset and fetch epoch are consistent with its own log. If they do not match, the fetch response will indicate the largest epoch and its end offset before the requested epoch. It is the responsibility of followers to detect leader changes and use FetchQuorumRecords responses to truncate the log.

The advantage of this approach is that it simplifies the follower state machine. Basically it just needs to sends fetches to the current expected leader with the fetch offset. The response may indicate either a new leader to fetch from or a new fetch offset. It is also safer because the leader is able to validate the fetch offset on every request rather than relying on the follower to detect the epoch changes.

Extra condition on commitment: The Raft protocol requires the leader to only commit entries from any previous epoch if the same leader has already successfully replicated an entry from the current epoch. Kafka's ISR replication protocol suffers from a similar problem and handles it by not advancing the high watermark until the leader is able to write a message with its own epoch. The diagram below taken from the Raft dissertation illustrates the scenario:

Image Added


The problem concerns the conditions for commitment and leader election. In this diagram, S1 is the initial leader and writes "2," but fails to commit it to all replicas. The leadership moves to S5 which writes "3," but also fails to commit it. Leadership then returns to S1 which proceeds to attempt to commit "2." Although "2" is successfully written to a majority of the nodes, the risk is that S5 could still become leader, which would lead to the truncation of "2" even though it is present on a majority of nodes.

In Kafka's replication protocol, we address this issue by not advancing the high watermark after an election until an entry has been written by the new leader with its epoch. So we would allow the election to S5, which had written "3" even though a majority of nodes had written "2." This is is allowable because "2" was not considered committed. Therefore, as long as we do not respond to client's request for the entries it added until the high watermark is advanced beyond it then this extra condition is satisfied.

This proposal follow's Kafka's replication protocol. The high watermark for the quorum is not advanced until a record from the current epoch has been written. This is the purpose of the "dummy" leader change message that was mentioned in the section on Vote handling. Without this new message, it could be some time before a new entry is written, which could prevent recent appends from being committed.

As stated, we shall just reuse existing Fetch API, with an additional FetchEpoch field to allow leader to do the divergence check as necessary. In long term, we would consider deprecating the OffsetForLeaderEpoch API to consolidate the offset validation into Fetch protocol. We would also add two new optional structs to the FetchResponse: NextFetchOffsetAndEpoch and CurrentLeader. The first is provided if the FetchRequest does not specify a valid offset and epoch. A follower handles this by attempt to truncate to the next offset specified in the response. The second is the current leader, which will always be provided for fetches from the metadata quorum, but is ignored in normal partition replication.

Request Schema

Code Block
{
  "apiKey": 1,
  "type": "request",
  "name": "FetchRequest",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
    { "name": "MaxWaitTimeMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false,
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records" },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": false,
      "about": "The fetch session epoch, which is used for ordering requests in a session" },
    { "name": "Topics", "type": "[]FetchableTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "FetchPartitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch of the partition." },
        { "name": "FetchOffset", "type": "int64", "versions": "0+",
          "about": "The message offset." },
		// ---------- Start new field ----------
        { "name": "FetchEpoch", "type": "int32", "versions": "12+",  
          "about": "The epoch of the last replicated record"},
		// ---------- End new field ----------
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": false,
          "about": "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." }
      ]}
    ]},
    { "name": "Forgotten", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,
      "about": "In an incremental fetch request, the partitions to remove.", "fields": [
      { "name": "Name", "type": "string", "versions": "7+", "entityType": "topicName",
        "about": "The partition name." },
      { "name": "ForgottenPartitionIndexes", "type": "[]int32", "versions": "7+",
        "about": "The partitions indexes to forget." }
    ]},
    { "name": "RackId", "type":  "string", "versions": "11+", "default": "", "ignorable": true,
      "about": "Rack ID of the consumer making this request"}
  ]
}

Response Schema

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partiiton index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
   
Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "FetchQuorumRecordsRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "int32", "versions": "0+"},
      {"name": "ReplicaId", "type": "int32", "versions": "0+",
       "about": "The ID of the replica sending the request"},current log start offset." },
	    // ---------- Start new field ----------
        { "name": "LeaderEpochNextOffsetAndEpoch", "type": "int32OffsetAndEpoch", 
          "versions": "012+",
 "taggedVersions": "12+",     "about": "The current leader epoch"},
"tag": 0, "fields": [
          { "name": "FetchOffsetNextFetchOffset", "type": "int64", "versions": "0+",
            "about": "The next expected offset to be replicatedIf set, this is the offset that the follower should truncate to"},
          { "name": "FetchEpochNextFetchOffsetEpoch", "type": "int32", "versions": "0+",
            "about": "The epoch of the lastnext replicated record"},
      {"name":  "MaxWaitTimeMs", "type":  "int32", "versions": "0+",
       "about": "The maximum time to wait for new data to arrive" }
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
 offset in case the follower needs to truncate"},        
        ]},
        { "name": "FetchQuorumRecordsResponseCurrentLeader",
  "validVersionstype": "0LeaderIdAndEpoch", 
  "fields": [
        {"nameversions": "ErrorCode12+", "typetaggedVersions": "int1612+", "versionstag": "0+"},
1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
      {"name": "NextFetchOffset", "type": "int64", "versions": "0+",
        "about": "If set, this is the offset that the follower should truncate to"},
 The latest known leader epoch"}
        ]},
		// ---------- End new field ----------
        { "name": "NextFetchOffsetEpochAborted", "type": "int32[]AbortedTransaction", "versions": "4+", "nullableVersions": "04+", "ignorable": false,
          "about": "The epoch of the next offset in case the follower needs to truncate"},
      {aborted transactions.",  "fields": [
          { "name": "RecordsProducerId", "type": "bytesint64", "versions": "0+4+", "entityType": "producerId",
            "about": "The fetched record data"},
 "The producer id associated with the aborted transaction." },
          { "name": "HighWatermarkFirstOffset", "type": "int64", "versions": "04+",
            "about": "The current high watermark"},
	  { first offset in the aborted transaction." }
        ]},
        { "name": "FirstDirtyOffsetPreferredReadReplica", "type": "int64int32", "versions": "011+", "ignorable": true,
          "about": "FirstThe preferred dirtyread offsetreplica whichfor allowsthe followersconsumer to determine consistent snapshots use on its next fetch request"},
        { "name": "LastCaughtUpTimeMsRecords", "type": "int64bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The lastrecord time the follower was caught up with a majority of the voters"data." }
      ]}
    ]}
  ]
}

...

FetchRequest Handling

When a leader receives a FetchQuorumRecords request FetchRequest, it must check the following:

...

The check in step 2 is similar to the logic that followers use today in Kafka through the OffsetsForLeaderEpoch API. In order to make this check efficient, Kafka maintains a leader-epoch-checkpoint file on disk, the contents of which is cached in memory. After every epoch change is observed, we write the epoch and its start offset to this file and update the cache. This allows us to efficiently check whether the leader's log has diverged from the follower and where the point of divergence is. Note that it may take multiple rounds of FetchQuorumRecords in order to find the first offset that diverges in the worst case.

...

FetchResponse Handling

When handling the response, a follower/observer will do the following:

...