Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum (Accepted)

Status

Current state: Under DiscussionAccepted

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9876

...

Leader Progress Timeout

In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.

As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.

...

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log start offset." },
	    // ---------- Start new field ----------
        { "name": "TruncationOffsetDivergingEpoch", "type": "int64EpochEndOffset", 
          "versions": "12+", "taggedVersions": "12+", "tag": 0,
          "about": "If set, the follower must truncate all offsets that are greater than or equal to this value."}In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
        {  "namefields": "CurrentLeader", "type": "LeaderIdAndEpoch", 
 [
            { "name": "Epoch", "versionstype": "12+int32", "taggedVersionsversions": "12+", "tagdefault": "-1" },
 fields": [
          { "name": "LeaderIdEndOffset", "type": "int32int64", "versions": "012+",
 "default": "-1" }
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", 
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
		// ---------- End new field ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

...

  1. Check that the clusterId if not null matches the cached value in meta.properties.
  2. First ensure that the leader epoch from the request is the same as the locally cached value. If not, reject this request with either the FENCED_LEADER_EPOCH or UNKNOWN_LEADER_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually the receiver would learn about the new epoch anyways. Actually this case should not happen since, unlike the normal partition replication protocol, leaders are always the first to discover that they have been elected.
  3. Check that LastFetchEpoch LastFetchedEpoch is consistent with the leader's log. The leader assumes that the LastFetchEpoch LastFetchedEpoch is the epoch of the offset prior to FetchOffset. FetchOffset is expected to be in the range of offsets with an epoch in LastFetchEpoch of LastFetchedEpoch or it is the start offset of the next epoch in the log. If not, return OUT_OF_RANGE and encode the next FetchOffset as the start offset of the epoch which is right after the epoch that is less than or equal to the fetcher's last fetch epocha empty response (no records included in the response) and set the DivergingEpoch to the largest epoch which is less than fetcher's LastFetchedEpoch. This is an optimization to truncation to let the follower truncate as much as possible to get to the starting - divergence point with fewer Fetch round-trips. For example, If the fetcher's last epoch is X which does not match the epoch of the offset prior to the fetching offset, then it means that all the records of with epoch X on that voter the follower may have diverged and hence could be truncated, then returning the start offset next epoch of epoch Y (where Y < X) is reasonable.
  4. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.

...

  1. If the response contains FENCED_LEADER_EPOCH error code, check the leaderId from the response. If it is defined, then update the quorum-state file and become a "follower" (may or may not have voting power) of that leader. Otherwise, retry to the Fetch request again against one of the remaining voters at random; in the mean time it may receive BeginQuorumEpoch request which would also update the new epoch / leader as well.
  2. If the response contains OUT_OF_RANGE error code, truncate its local log to the encoded nextFetchOffset, and then resend the FetchRecord requesthas the field DivergingEpoch set, then truncate from the log all of the offsets greater than or equal to DivergingEpoch.EndOffset and truncate any offset with an epoch greater than DivergingEpoch.EndOffset.

Note that followers will have to check any records received for the presence of control records. Specifically a follower/observer must check for voter assignment messages which could change its role.

...

The DescribeQuorum API is used by the admin client to show the status of the quorum. This includes showing the progress of a quorum reassignment and viewing the lag of followers and observers. More of the details can be found in KIP-642: Dynamic quorum reassignment#DescribeQuorum.Note that this API must be sent to the leader, which is the only node that would have lag information for all of the votersto show the status of the quorum. This API must be sent to the leader, which is the only node that would have lag information for all of the voters.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "DescribeQuorumRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]
      }]
    }
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}],
  "commonStructs": [
    { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+"},
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"}
    ]}
  ]
}


DescribeQuorum Request Handling

This request is always sent to the leader node. We expect AdminClient to use the Metadata API in order to discover the current leader. Upon receiving the request, a node will do the following:

  1. First check whether the node is the leader. If not, then return an error to let the client retry with Metadata. If the current leader is known to the receiving node, then include the LeaderId and LeaderEpoch in the response.
  2. Build the response using current assignment information and cached state about replication progress.

DescribeQuorum Response Handling

On handling the response, the admin client would do the following:

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with Metadata.
  3. Otherwise the response can be returned to the application, or the request eventually times out.

Cluster Bootstrapping

When the cluster is initialized for the first time, the voters will find each other through the static quorum.voters configuration. It is the job of the first elected leader (i.e. the first controller) to generate a UUID that will serve as a unique clusterId. We expect this to happen within the controller state machine that defined by KIP-631. This ID will be stored in the metadata log as a message and will be propagated to all brokers in the cluster through the replication protocol defined by this proposal. (From an implementation perspective, the Raft library will provide a hook for the initialization of the clusterId.)

...

There will be two options available with -- describe 

  • describe --describe status: a short summary of the quorum status and the other provides detailed information about the status of replication.
  • describe --describe replication: provides detailed information about the status of replication

...

Code Block
> bin/kafka-metadata-quorum.sh describe --describestatus
ClusterId:              SomeClusterId
LeaderId:				0
LeaderEpoch: 			15
HighWatermark:			234130
MaxFollowerLag: 		34
MaxFollowerLagTimeMs:	15
CurrentVoters:			[0, 1, 2]

> bin/kafka-metadata-quorum.sh describe --describe replication
ReplicaId	LogEndOffset	Lag		LagTimeMs	Status
0			234134			0		0			Leader
1			234130			4		10			Follower
2			234100			34		15			Follower
3			234124			10		12			Observer
4			234130			4		15			Observer

...

NAME

TAGS

TYPE

NOTE

current-leader

type=raft-manager

dynamic gauge

-1 means UNKNOWN

current-epoch

type=raft-manager

dynamic gauge

0 means UNKNOWN

current-vote

type=raft-manager

dynamic gauge

-1 means not voted for anyone

log-end-offset

type=raft-manager

dynamic gauge


log-end-epoch

type=raft-manager

dynamic gauge


high-watermark

type=raft-manager

dynamic gauge

boot-timestamp

type=raft-manager

static gauge


current-state

type=raft-manager

dynamic enum

possible values: "leader", "follower", "candidate", "observer"

number-unknown-voter-connections

type=raft-manager

dynamic gauge

number of unknown voters whose connection information is not cached; would never be larger than quorum-size

election-latency-max/avg

type=raft-manager

dynamic gauge

measured on each voter as windowed sum / avg, start when becoming a candidate and end on learned or become the new leader

commit-latency-max/avg

type=raft-manager

dynamic gauge

measured on leader as windowed sum / avg, start when appending the record and end on hwm advanced beyond

fetch-records-rate

type=raft-manager

windowed rate

apply to follower and observer only

append-records-rate

type=raft-manager

windowed rate

apply to leader only

poll-idle-ratio-avg

type=raft-manager

windowed average


...