...
Leader Progress Timeout
In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.
To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms
" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest
to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.
As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.
...
Note that followers will have to check any records received for the presence of control records. Specifically a follower/observer must check for voter assignment messages which could change its role.
...
The DescribeQuorum API is used by the admin client to show the status of the quorum. This API must be sent to the leader, which is the only node that would have lag information for all of the voters.
Request Schema
Code Block |
---|
{ "apiKey": N, "type": "request", "name": "DescribeQuorumRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." } ] }] } ] } |
Response Schema
Code Block |
---|
{ "apiKey": N, "type": "response", "name": "DescribeQuorumResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top level error code."}, { "name": "ClusterId", "type": "string", "versions" "0+", "nullableVersions": "12+" }, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+"}, { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"}, { "name": "HighWatermark", "type": "int64", "versions": "0+"}, { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" }, { "name": "TargetVoters", "type": "[]ReplicaState", "versions": "0+" }, { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" } ]} ]}], "commonStructs": [ { "name": "ReplicaState", "versions": "0+", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+"}, { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The last known log end offset of the follower or -1 if it is unknown"} ]} ] } |
DescribeQuorum Request Handling
This request is always sent to the leader node. We expect AdminClient
to use the Metadata
API in order to discover the current leader. Upon receiving the request, a node will do the following:
- First check whether the node is the leader. If not, then return an error to let the client retry with Metadata. If the current leader is known to the receiving node, then include the
LeaderId
andLeaderEpoch
in the response. - Build the response using current assignment information and cached state about replication progress.
DescribeQuorum Response Handling
On handling the response, the admin client would do the following:
- If the response indicates that the intended node is not the current leader, then check the response to see if the
LeaderId
has been set. If so, then attempt to retry the request with the new leader. - If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with
Metadata
. - Otherwise the response can be returned to the application, or the request eventually times out.
Cluster Bootstrapping
When the cluster is initialized for the first time, the voters will find each other through the static quorum.voters
configuration. It is the job of the first elected leader (i.e. the first controller) to generate a UUID that will serve as a unique clusterId
. We expect this to happen within the controller state machine that defined by KIP-631. This ID will be stored in the metadata log as a message and will be propagated to all brokers in the cluster through the replication protocol defined by this proposal. (From an implementation perspective, the Raft library will provide a hook for the initialization of the clusterId.)
...
There will be two options available with -- describe
:
describe --describe status
: a short summary of the quorum status and the other provides detailed information about the status of replication.describe --describe
replication
: provides detailed information about the status of replication
...
Code Block |
---|
> bin/kafka-metadata-quorum.sh describe --describestatus ClusterId: SomeClusterId LeaderId: 0 LeaderEpoch: 15 HighWatermark: 234130 MaxFollowerLag: 34 MaxFollowerLagTimeMs: 15 CurrentVoters: [0, 1, 2] > bin/kafka-metadata-quorum.sh describe --describe replication ReplicaId LogEndOffset Lag LagTimeMs Status 0 234134 0 0 Leader 1 234130 4 10 Follower 2 234100 34 15 Follower 3 234124 10 12 Observer 4 234130 4 15 Observer |
...