Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that this protocol assumes something like the Kafka v2 log message format. It is not compatible with older formats because records do not include the leader epoch information that is needed for log reconciliation. We have intentionally avoided any make no assumption about the representation of the physical log and its semantics. This format and minimal assumptions about it's logical structure. This makes it usable both for internal metadata replication and (eventually) partition data replication.

...

Records are uniquely defined by their offset in the log and the epoch of the leader that appended the record. The key and value schemas will be defined by the controller in a separate KIP; here we treat them as arbitrary byte arrays. However, we do require the ability to append "control records" to the log which are reserved only for use within the Raft quorum (e.g. this enables quorum reassignment).

Kafka's current The v2 message format version supports everything we need, so we will assume thathas all the assumed/required properties already.

Quorum State 

We use a separate file to store the current state of the quorum. This is both for convenience and correctness. It helps us to initialize the quorum state after a restart, but we also need it in order to know which broker we have voted for in a given election. The Raft protocol does not allow voters to change their votes, so we have to preserve this state across restarts. Below is the schema for this quorum-state file.

...

Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness, i.e. we are dropping group flushing for this topic. In practice, we could still optimize the fsync latency in the following way: 1) the client requests to the leader are expected to have multiple entries, 2) the fetch response from the leader can contain multiple entries, and 3) the leader can actually defer fsync until it knows "quorum.size majority - 1" has get to a certain entry offset. We will discuss a bit more about these in the following sectionsleave this potential optimization as a future work out of the scope of this KIP design.

Other log-related metadata such as log start offset, recovery point, HWM, etc are still stored in the existing checkpoint file just like any other Kafka topic partitions.

...

The key functionalities of any consensus protocol are leader election and data replication. The protocol for these two functionalities consists of 5 four core RPCs:

  • Vote: Sent by a voter to initiate an election.
  • BeginQuorumEpoch: Used by a new leader to inform the voters of its status.
  • EndQuorumEpoch: Used by a leader to gracefully step down and allow a new election.
  • Fetch: Sent by voters and observers to the leader in order to replicate the log.

...

Leader Progress Timeout

In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.

As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.

...

The EndQuorumEpochRequest will be sent to all voters in the quorum. Inside each request, leader will define the list of preferred successors sorted by each voter's current replicated offset in descending order. Based on the priority of the preferred successors, each voter will choose the corresponding delayed election time so that the most up-to-date voter has a higher chance to be elected. If the node's priority is highest, it will become candidate immediately instead of waiting for next polland not wait for the election timeout. For a successor with priority N > 0, the next election timeout will be computed as:

...

NAME

TAGS

TYPE

NOTE

current-leader

type=raft-manager

dynamic gauge

-1 means UNKNOWN

current-epoch

type=raft-manager

dynamic gauge

0 means UNKNOWN

current-vote

type=raft-manager

dynamic gauge

-1 means not voted for anyone

log-end-offset

type=raft-manager

dynamic gauge


log-end-epoch

type=raft-manager

dynamic gauge


high-watermark

type=raft-manager

dynamic gauge


boot-timestamp

type=raft-manager

static gauge


current-state

type=raft-manager

dynamic enum

possible values: "leader", "follower", "candidate", "observer"

number-unknown-voter-connections

type=raft-manager

dynamic gauge

number of unknown voters whose connection information is not cached; would never be larger than quorum-size

election-latency-max/avg

type=raft-manager

dynamic gauge

measured on each voter as windowed sum / avg, start when becoming a candidate and end on learned or become the new leader

commit-latency-max/avg

type=raft-manager

dynamic gauge

measured on leader as windowed sum / avg, start when appending the record and end on hwm advanced beyond

fetch-records-rate

type=raft-manager

windowed rate

apply to follower and observer only

append-records-rate

type=raft-manager

windowed rate

apply to leader only

poll-idle-ratio-avg

type=raft-manager

windowed average


...