...
KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum (Accepted)
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Leader Progress Timeout
In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.
To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms
" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest
to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.
As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.
...
Code Block |
---|
{ "apiKey": 1, "type": "response", "name": "FetchResponse", "validVersions": "0-12", "flexibleVersions": "12+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false, "about": "The fetch session ID, or 0 if this is not part of a fetch session." }, { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no fetch error." }, { "name": "HighWatermark", "type": "int64", "versions": "0+", "about": "The current high water mark." }, { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The current log start offset." }, // ---------- Start new field ---------- { "name": "TruncationOffsetDivergingEpoch", "type": "int64EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0, "about": "If set, the follower must truncate all offsets that are greater than or equal to this value."}In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge", { "namefields": "CurrentLeader", "type": "LeaderIdAndEpoch", [ { "name": "Epoch", "versionstype": "12+int32", "taggedVersionsversions": "12+", "tagdefault": "-1" }, fields": [ { "name": "LeaderIdEndOffset", "type": "int32int64", "versions": "012+", "default": "-1" } ]}, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"} ]}, // ---------- End new field ---------- { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false, "about": "The aborted transactions.", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", "about": "The producer id associated with the aborted transaction." }, { "name": "FirstOffset", "type": "int64", "versions": "4+", "about": "The first offset in the aborted transaction." } ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true, "about": "The preferred read replica for the consumer to use on its next fetch request"}, { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "about": "The record data." } ]} ]} ] } |
...
- Check that the clusterId if not null matches the cached value in
meta.properties
. - First ensure that the leader epoch from the request is the same as the locally cached value. If not, reject this request with either the FENCED_LEADER_EPOCH or UNKNOWN_LEADER_EPOCH error.
- If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
- If the leader epoch is larger, then eventually the receiver would learn about the new epoch anyways. Actually this case should not happen since, unlike the normal partition replication protocol, leaders are always the first to discover that they have been elected.
- Check that
LastFetchEpoch
LastFetchedEpoch
is consistent with the leader's log. The leader assumes that theLastFetchEpoch
LastFetchedEpoch
is the epoch of the offset prior toFetchOffset. FetchOffset
is expected to be in the range of offsets with an epoch inLastFetchEpoch
ofLastFetchedEpoch
or it is the start offset of the next epoch in the log. If not, return OUT_OF_RANGE and encode the nextFetchOffset
as the start offset of the epoch which is right after the epoch that is less than or equal to the fetcher's last fetch epocha empty response (no records included in the response) and set theDivergingEpoch
to the largest epoch which is less than fetcher'sLastFetchedEpoch
. This is an optimization to truncation to let the follower truncate as much as possible to get to the starting - divergence point with fewer Fetch round-trips. For example, If the fetcher's last epoch is X which does not match the epoch of the offset prior to the fetching offset, then it means that all the records of with epoch X on that voter the follower may have diverged and hence could be truncated, then returning the start offset next epoch of epoch Y (where Y < X) is reasonable. - If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.
...
- If the response contains FENCED_LEADER_EPOCH error code, check the
leaderId
from the response. If it is defined, then update thequorum-state
file and become a "follower" (may or may not have voting power) of that leader. Otherwise, retry to theFetch
request again against one of the remaining voters at random; in the mean time it may receive BeginQuorumEpoch request which would also update the new epoch / leader as well. - If the response contains OUT_OF_RANGE error code, truncate its local log to the encoded nextFetchOffset, and then resend the
FetchRecord
requesthas the fieldDivergingEpoch
set, then truncate from the log all of the offsets greater than or equal toDivergingEpoch.EndOffset
and truncate any offset with an epoch greater thanDivergingEpoch.EndOffset
.
Note that followers will have to check any records received for the presence of control records. Specifically a follower/observer must check for voter assignment messages which could change its role.
...
The DescribeQuorum API is used by the admin client to show the status of the quorum. This includes showing the progress of a quorum reassignment and viewing the lag of followers and observers. More of the details can be found in KIP-642: Dynamic quorum reassignment#DescribeQuorum.Note that this API must be sent to the leader, which is the only node that would have lag information for all of the votersto show the status of the quorum. This API must be sent to the leader, which is the only node that would have lag information for all of the voters.
Request Schema
Code Block |
---|
{
"apiKey": N,
"type": "request",
"name": "DescribeQuorumRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData",
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." }
]
}]
}
]
} |
Response Schema
Code Block |
---|
{
"apiKey": N,
"type": "response",
"name": "DescribeQuorumResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level error code."},
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData",
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+"},
{ "name": "LeaderId", "type": "int32", "versions": "0+",
"about": "The ID of the current leader or -1 if the leader is unknown."},
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The latest known leader epoch"},
{ "name": "HighWatermark", "type": "int64", "versions": "0+"},
{ "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
{ "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
]}
]}],
"commonStructs": [
{ "name": "ReplicaState", "versions": "0+", "fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+"},
{ "name": "LogEndOffset", "type": "int64", "versions": "0+",
"about": "The last known log end offset of the follower or -1 if it is unknown"}
]}
]
} |
DescribeQuorum Request Handling
This request is always sent to the leader node. We expect AdminClient
to use the Metadata
API in order to discover the current leader. Upon receiving the request, a node will do the following:
- First check whether the node is the leader. If not, then return an error to let the client retry with Metadata. If the current leader is known to the receiving node, then include the
LeaderId
andLeaderEpoch
in the response. - Build the response using current assignment information and cached state about replication progress.
DescribeQuorum Response Handling
On handling the response, the admin client would do the following:
- If the response indicates that the intended node is not the current leader, then check the response to see if the
LeaderId
has been set. If so, then attempt to retry the request with the new leader. - If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with
Metadata
. - Otherwise the response can be returned to the application, or the request eventually times out.
Cluster Bootstrapping
When the cluster is initialized for the first time, the voters will find each other through the static quorum.voters
configuration. It is the job of the first elected leader (i.e. the first controller) to generate a UUID that will serve as a unique clusterId
. We expect this to happen within the controller state machine that defined by KIP-631. This ID will be stored in the metadata log as a message and will be propagated to all brokers in the cluster through the replication protocol defined by this proposal. (From an implementation perspective, the Raft library will provide a hook for the initialization of the clusterId.)
...
There will be two options available with -- describe
:
describe --describe status
: a short summary of the quorum status and the other provides detailed information about the status of replication.describe --describe
replication
: provides detailed information about the status of replication
...
Code Block |
---|
> bin/kafka-metadata-quorum.sh describe --describestatus ClusterId: SomeClusterId LeaderId: 0 LeaderEpoch: 15 HighWatermark: 234130 MaxFollowerLag: 34 MaxFollowerLagTimeMs: 15 CurrentVoters: [0, 1, 2] > bin/kafka-metadata-quorum.sh describe --describe replication ReplicaId LogEndOffset Lag LagTimeMs Status 0 234134 0 0 Leader 1 234130 4 10 Follower 2 234100 34 15 Follower 3 234124 10 12 Observer 4 234130 4 15 Observer |
...
NAME | TAGS | TYPE | NOTE | |||
current-leader | type=raft-manager | dynamic gauge | -1 means UNKNOWN | |||
---|---|---|---|---|---|---|
current-epoch | type=raft-manager | dynamic gauge | 0 means UNKNOWN | |||
current-vote | type=raft-manager | dynamic gauge | -1 means not voted for anyone | |||
log-end-offset | type=raft-manager | dynamic gauge | ||||
log-end-epoch | type=raft-manager | dynamic gauge | ||||
high-watermark | type=raft-manager | dynamic gauge | boot-timestamp | type=raft-manager | static gauge | |
current-state | type=raft-manager | dynamic enum | possible values: "leader", "follower", "candidate", "observer" | |||
number-unknown-voter-connections | type=raft-manager | dynamic gauge | number of unknown voters whose connection information is not cached; would never be larger than quorum-size | |||
election-latency-max/avg | type=raft-manager | dynamic gauge | measured on each voter as windowed sum / avg, start when becoming a candidate and end on learned or become the new leader | |||
commit-latency-max/avg | type=raft-manager | dynamic gauge | measured on leader as windowed sum / avg, start when appending the record and end on hwm advanced beyond | |||
fetch-records-rate | type=raft-manager | windowed rate | apply to follower and observer only | |||
append-records-rate | type=raft-manager | windowed rate | apply to leader only | |||
poll-idle-ratio-avg | type=raft-manager | windowed average |
...