...
quorum.voters
: This is a connection map which contains the IDs of the voters and their respective endpoint. We use the following format for each voter in the list{broker-id}@{broker-host):{broker-port}
. For example, `quorum.voters=1@kafka-1:9092, 2@kafka-2:9092, 3@kafka-3:9092
`.quorum.fetch.timeout.ms:
Maximum time without a successful fetch from the current leader before a new election is started.quorum.election.timeout.ms:
Maximum time without collected a majority of votes during the candidate state before a new election is retried.quorum.election.backoff.max.ms:
Maximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.quorum.request.timeout.ms:
Maximum time before a pending request is considered failed and the connection is dropped.quorum.retry.backoff.ms:
Initial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs above.quorum.retry.backoff.max.ms
: Max delay between requests. Backoff will increase exponentially beginning fromquorum.retry.backoff.ms
(the same as in KIP-580).broker.id
: The existing broker id config shall be used as the voter id in the Raft quorum.
...
Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness. In practice, we could optimize the fsync latency in the following way: 1) the client requests to the leader are expected to have multiple entries, 2) the fetch response from the leader can contain multiple entries, and 3) the leader can actually defer fsync until it knows "quorum.size - 1" has get to a certain entry offset. We will discuss a bit more about these in the following sections.
Other log-related metadata such as log start offset, recovery point, HWM, etc are still stored in the existing checkpoint file just like any other Kafka topic partitions.
Additionally, we make use of the current meta.properties
file, which caches the value from broker.id
in the configuration as well as the discovered clusterId. As is the case today, the configured broker.id
does not match the cached value, then the broker will refuse to start. If we connect to a cluster with a different clusterId, then the broker will receive a fatal error and shutdown.
...
Leader Progress Timeout
In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.
To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms
" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest
to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.
As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.
...
Here’s a list of proposed metrics for this new protocol:
NAME | TAGS | TYPE | NOTE | ||||||||
CurrentLeadercurrent-leader | type=raft-manager | dynamic gauge | -1 means UNKNOWN | ||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
CurrentEpochcurrent-epoch | type=raft-manager | dynamic gauge | 0 means UNKNOWN | ||||||||
CurrentVotecurrent-vote | type=raft-manager | dynamic gauge | -1 means not voted for anyone | ||||||||
LogEndOffsetlog-end-offset | type=raft-manager | dynamic gauge | |||||||||
LogEndEpochlog-end-epoch | type=raft-manager | dynamic gauge | |||||||||
BootTimestamphigh-watermark | type=raft-manager | dynamic gauge | |||||||||
boot-timestamp | type=raft-manager | static gauge | |||||||||
current-stateState | type=raft-manager | dynamic enum | possible values: "leader", "follower", "candidate", "observer" | ||||||||
NumQuorumVotersnumber-unknown-voter-connections | type=raft-manager | dynamic gauge | number of cached voter connectionsunknown voters whose connection information is not cached; would never be larger than quorum-size | ||||||||
ElectionLatencyMax/Avgelection-latency-max/avg | type=raft-manager | dynamic gauge | measured on each voter, start when becoming a candidate and end on learned or become the new leader | ||||||||
ReplicationLatencyMax/Avgcommit-latency-max/avg | type=raft-manager | dynamic gauge | measured on leader, start when appending the record and end on hwm advanced beyond | ||||||||
InboundRequestPerSecfetch-records-rate | type=raft-manager, source-broker-id=[broker-id] | windowed rate | one per source | ||||||||
OutboundRequestPerSec | type=raft-manager, destination-broker-id=[broker-id] | windowed rate | one per destination | ||||||||
windowed rate | InboundChannelSize | type=raft-manager | windowed average | OutboundChannelSize | type=raft-manager | windowed average | FetchRecordsPerSec | type=raft-manager | windowed rate | apply to follower and observer only | |
AppendRecordsPerSecappend-records-rate | type=raft-manager | windowed rate | apply to leader only | ||||||||
ErrorResponsePerSec | type=raft-manager, destination-broker-id=[broker-id] | windowed rate | one per destination | ||||||||
TotalTimeMs | type=raft-manager, request=[request-type] | windowed average | one per inbound request type | ||||||||
InboundQueueTimeMs | type=raft-manager, request=[request-type] | windowed average | one per inbound request type | ||||||||
HandleTimeMs | type=raft-manager, request=[request-type] | windowed average | one per inbound request type | ||||||||
OutboundQueueTimeMs | type=raft-manager, request=[request-type] | windowed average | one per inbound request type | ||||||||
AvgIdlePercentpoll-idle-ratio-avg | type=raft-manager | windowed average |
...