Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • quorum.voters: This is a connection map which contains the IDs of the voters and their respective endpoint. We use the following format for each voter in the list {broker-id}@{broker-host):{broker-port}. For example, `quorum.voters=1@kafka-1:9092, 2@kafka-2:9092, 3@kafka-3:9092`.
  • quorum.fetch.timeout.ms: Maximum time without a successful fetch from the current leader before a new election is started.
  • quorum.election.timeout.ms: Maximum time without collected a majority of votes during the candidate state before a new election is retried.
  • quorum.election.backoff.max.ms: Maximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.
  • quorum.request.timeout.ms: Maximum time before a pending request is considered failed and the connection is dropped.
  • quorum.retry.backoff.ms: Initial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs above.
  • quorum.retry.backoff.max.ms: Max delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.ms (the same as in KIP-580).
  • broker.id: The existing broker id config shall be used as the voter id in the Raft quorum.

...

Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness. In practice, we could optimize the fsync latency in the following way: 1) the client requests to the leader are expected to have multiple entries, 2) the fetch response from the leader can contain multiple entries, and 3) the leader can actually defer fsync until it knows "quorum.size - 1" has get to a certain entry offset. We will discuss a bit more about these in the following sections.

Other log-related metadata such as log start offset, recovery point, HWM, etc are still stored in the existing checkpoint file just like any other Kafka topic partitions.

Additionally, we make use of the current meta.properties file, which caches the value from broker.id in the configuration as well as the discovered clusterId. As is the case today, the configured broker.id does not match the cached value, then the broker will refuse to start. If we connect to a cluster with a different clusterId, then the broker will receive a fatal error and shutdown.

...

Leader Progress Timeout

In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.

As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.

...

Here’s a list of proposed metrics for this new protocol:

NAME

TAGS

TYPE

NOTE

CurrentLeadercurrent-leader

type=raft-manager

dynamic gauge

-1 means UNKNOWN

CurrentEpochcurrent-epoch

type=raft-manager

dynamic gauge

0 means UNKNOWN

CurrentVotecurrent-vote

type=raft-manager

dynamic gauge

-1 means not voted for anyone

LogEndOffsetlog-end-offset

type=raft-manager

dynamic gauge


LogEndEpochlog-end-epoch

type=raft-manager

dynamic gauge


BootTimestamphigh-watermark

type=raft-manager

dynamic gauge


boot-timestamp

type=raft-manager

static gauge


current-stateState

type=raft-manager

dynamic enum

possible values: "leader", "follower", "candidate", "observer"

NumQuorumVotersnumber-unknown-voter-connections

type=raft-manager

dynamic gauge

number of cached voter connectionsunknown voters whose connection information is not cached; would never be larger than quorum-size

ElectionLatencyMax/Avgelection-latency-max/avg

type=raft-manager

dynamic gauge

measured on each voter, start when becoming a candidate and end on learned or become the new leader

ReplicationLatencyMax/Avgcommit-latency-max/avg

type=raft-manager

dynamic gauge

measured on leader, start when appending the record and end on hwm advanced beyond

InboundRequestPerSecfetch-records-rate

type=raft-manager, source-broker-id=[broker-id]

windowed rate

one per source

OutboundRequestPerSec

type=raft-manager, destination-broker-id=[broker-id]

windowed rate

one per destination

windowed rate

InboundChannelSize

type=raft-manager

windowed average

OutboundChannelSize

type=raft-manager

windowed average

FetchRecordsPerSec

type=raft-manager

windowed rate

apply to follower and observer only

AppendRecordsPerSecappend-records-rate

type=raft-manager

windowed rate

apply to leader only

ErrorResponsePerSec

type=raft-manager, destination-broker-id=[broker-id]

windowed rate

one per destination

TotalTimeMs

type=raft-manager, request=[request-type]

windowed average

one per inbound request type

InboundQueueTimeMs

type=raft-manager, request=[request-type]

windowed average

one per inbound request type

HandleTimeMs

type=raft-manager, request=[request-type]

windowed average

one per inbound request type

OutboundQueueTimeMs

type=raft-manager, request=[request-type]

windowed average

one per inbound request type

AvgIdlePercentpoll-idle-ratio-avg

type=raft-manager

windowed average


...