Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Leader Progress Timeout

In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.

As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.

...

Note that followers will have to check any records received for the presence of control records. Specifically a follower/observer must check for voter assignment messages which could change its role.

...

The DescribeQuorum API is used by the admin client to show the status of the quorum. This API must be sent to the leader, which is the only node that would have lag information for all of the voters.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "DescribeQuorumRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]
      }]
    }
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "ClusterId", "type": "string", "versions" "0+", "nullableVersions": "12+" },
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "TargetVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}],
  "commonStructs": [
    { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+"},
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"}
    ]}
  ]
}


DescribeQuorum Request Handling

This request is always sent to the leader node. We expect AdminClient to use the Metadata API in order to discover the current leader. Upon receiving the request, a node will do the following:

  1. First check whether the node is the leader. If not, then return an error to let the client retry with Metadata. If the current leader is known to the receiving node, then include the LeaderId and LeaderEpoch in the response.
  2. Build the response using current assignment information and cached state about replication progress.

DescribeQuorum Response Handling

On handling the response, the admin client would do the following:

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with Metadata.
  3. Otherwise the response can be returned to the application, or the request eventually times out.

Cluster Bootstrapping

When the cluster is initialized for the first time, the voters will find each other through the static quorum.voters configuration. It is the job of the first elected leader (i.e. the first controller) to generate a UUID that will serve as a unique clusterId. We expect this to happen within the controller state machine that defined by KIP-631. This ID will be stored in the metadata log as a message and will be propagated to all brokers in the cluster through the replication protocol defined by this proposal. (From an implementation perspective, the Raft library will provide a hook for the initialization of the clusterId.)

...

There will be two options available with -- describe 

  • describe --describe status: a short summary of the quorum status and the other provides detailed information about the status of replication.
  • describe --describe replication: provides detailed information about the status of replication

...

Code Block
> bin/kafka-metadata-quorum.sh describe --describestatus
ClusterId:              SomeClusterId
LeaderId:				0
LeaderEpoch: 			15
HighWatermark:			234130
MaxFollowerLag: 		34
MaxFollowerLagTimeMs:	15
CurrentVoters:			[0, 1, 2]

> bin/kafka-metadata-quorum.sh describe --describe replication
ReplicaId	LogEndOffset	Lag		LagTimeMs	Status
0			234134			0		0			Leader
1			234130			4		10			Follower
2			234100			34		15			Follower
3			234124			10		12			Observer
4			234130			4		15			Observer

...