Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add section on clusterId bootstrapping

...

Code Block
{
  "type": "data",
  "name": "QuorumStateMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
	  {"name": "LeaderId", "type": "int32", "versions": "0+"},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+"},
      {"name": "VotedId", "type": "int32", "versions": "0+"},
      {"name": "AppliedOffset", "type": "int64", "versions": "0+"},
      {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
  ]
}

...

Below we define the purpose of these fields:

...

  • LeaderId: this is the last known leader of the quorum. A value of -1 indicates that there is no leader.
  • LeaderEpoch: this is the last known leader epoch. This is initialized to 0 when the quorum is bootstrapped and should never be negative.
  • VotedId: indicates the id of the broker that this replica voted for in the current epoch. A value of -1 indicates that the replica has not (or cannot) vote.
  • AppliedOffset: Reflects the maximum offset that has been applied to this quorum state. This is used for log recovery. The broker must scan from this point on initialization to detect updates to this file.
  • CurrentVoters: the latest known voters for this quorum

...

Additionally, we make use of the current meta.properties file, which caches the value from broker.id in the configuration as well as the discovered clusterId. As is the case today, the configured broker.id does not match the cached value, then the broker will refuse to start. Similarly, the cached clusterId is included when interacting with the quorum APIs. If we connect to a cluster with a different clusterId, then the broker will receive a fatal error and shutdown.

...

Before getting into the details of these APIs, there are a few common attributes worth mentioning upfront:

...

  1. All requests save Vote have a field for the leader epoch. Voters and leaders are always expected to ensure that the request epoch is consistent with its own and return an error if it is not.
  2. We piggyback current leader and epoch information on all responses. This reduces latency to discover leader changes.
  3. Although this KIP only expects a single-raft quorum implementation initially, we see it is beneficial to keep the door open for multi-raft architecture in the long term. This leaves the door open for use cases such as sharded controller or general quorum based topic replication. 

...

  • INVALID_CLUSTER_ID: The request either included indicates a clusterId which does not match the one expected by the leader or failed to include a clusterId when one was expectedvalue cached in meta.properties.
  • FENCED_LEADER_EPOCH: The leader epoch in the request is smaller than the latest known to the recipient of the request.
  • UNKNOWN_LEADER_EPOCH: The leader epoch in the request is larger than expected. Note that this is an unexpected error. Unlike normal Kafka log replication, it cannot happen that the follower receives the newer epoch before the leader.
  • OFFSET_OUT_OF_RANGE: Used in the Fetch API to indicate that the follower has fetched from an invalid offset and should truncate to the offset/epoch indicated in the response.
  • NOT_LEADER_FOR_PARTITION: Used in DescribeQuorum and AlterPartitionReassignments to indicate that the recipient of the request is not the current leader.
  • INVALID_QUORUM_STATE: This error code is reserved for cases when a request conflicts with the local known state. For example, if two separate nodes try to become leader in the same epoch, then it indicates an illegal state change.
  • INCONSISTENT_VOTER_SET: Used when the request contains inconsistent membership.

...

New elections are always delayed by a random time which is bounded by quorum.election.backoff.max.ms. This is part of the Raft protocol and is meant to prevent gridlocked elections. For example, with a quorum size of three, if only two voters are online, then we have to prevent repeated elections in which each voter declares itself a candidate and refuses to vote for the other.

Leader Progress Timeout

In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.

As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "VoteRequest",
  "validVersions": "0",
  "flexibleVersion": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      { "nameabout": "Topics", "Generated UUID for this cluster"},
      { "name": "Topics", "type": "[]VoteTopicRequest", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]VotePartitionRequest", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "CandidateEpoch", "type": "int32", "versions": "0+",
              "about": "The bumped epoch of the candidate sending the request"},
              {"name": "CandidateId", "type": "int32", "versions": "0+",
              "about": "The ID of the voter sending the request"},
              {"name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
              "about": "The epoch of the last record written to the metadata log"},
              {"name": "LastOffset", "type": "int64", "versions": "0+",
              "about": "The offset of the last record written to the metadata log"}
            ]
          }
      }
  ]
}

...

When a voter handles a Vote request:

  1. Check whether the clusterId in the request matches the cached local value (if present). If not, then reject the vote and return INVALID_CLUSTER_ID.
  2. First it checks whether an epoch larger than the candidate epoch from the request is known. If so, the vote is rejected.
  3. Next it checks if it has voted for that candidate epoch already. If it has, then only grant the vote if the candidate id matches the id that was already voted. Otherwise, the vote is rejected.
  4. If the candidate epoch is larger than the currently known epoch:
    1. Check whether CandidateId is one of the expected voters. If not, then reject the vote. The candidate in this case may have been part of an incomplete voter reassignment, so the voter should accept the epoch bump and itself begin a new election.
    2. Check that the candidate's log is at least as up-to-date as it (see above for the comparison rules). If yes, then grant that vote by first updating the quorum-state file, and then returning the response with voteGranted to yes; otherwise rejects that request with the response.

...

Code Block
{
  "type": "data",
  "name": "LeaderChangeMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+",
       "about" "Generated UUID for this cluster"},
	  {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the newly elected leader"},
      {"name": "VotedIds", "type": "[]int32", "versions": "0+",
       "about": "The IDs of the voters who voted for the current leader"},

  ]
}

The use of the clusterId field is described in more detail in the section on Cluster Bootstrapping. Essentially it is the responsibility of the first elected leader to initialize this value and for each subsequent leader to simply repeat it.

Also note that unlike other Kafka topic partition data whose log Also note that unlike other Kafka topic partition data whose log appends are persisted asynchronously, for this special quorum topic all log appends must be synced to FS before returning.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+",
       "about" "Generated UUID for this cluster"},
      { "name": "Topics", "type": "[]BeginQuorumTopicRequest", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]BeginQuorumPartitionRequest", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "LeaderId", "type": "int32", "versions": "0+",
                "about": "The ID of the newly elected leader"},
              {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
                "about": "The epoch of the newly elected leader"}
            ]
          }
      }
  ]
}

...

A voter will accept a BeginQuorumEpoch if its leader epoch is greater than or equal to the current known epoch so long as it doesn't conflict with previous knowledge. For example, if the leaderId for epoch 5 was known to be A, then a voter will reject a BeginQuorumEpoch request from a separate voter B from the same epoch. If the epoch is less than the known epoch, the request is rejected. 

As described in the section on Cluster Bootstrapping, a broker will fail if it receives a BeginQuorumEpoch with a clusterId which is inconsistent from the cached value.

As soon as a broker accepts a BeginQuorumEpoch request, it will transition to a follower state and begin sending Fetch requests to the new leader.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "EndQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
	{"name": "ClusterId", "type": "string", "versions": "0+"},
    { "name": "Topics", "type": "[]EndQuorumTopicRequest", 
      "versions": "0+", "fields": [
        { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]EndQuorumPartitionRequest", 
          "versions": "0+", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },    
            {"name": "ReplicaId", "type": "int32", "versions": "0+",
            "about": "The ID of the replica sending this request"},
            {"name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The current leader ID or -1 if there is a vote in progress"},
            {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The current epoch"},
			 {"name": "PreferredSuccessors", "type": "[]int32", "versions": "0+",
		      "about": "A sorted list of preferred successors to start the election"}
          ]
       }
    }      
  ]
}

...

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with Metadata.
  3. Otherwise the response can be returned to the application, or the request eventually times out.

Cluster Bootstrapping

When the cluster is initialized for the first time, the voters will find each other through the static quorum.voters configuration. The first leader that is elected is responsible for writing the initial leader change message as specified above and generating a UUID to serve as the clusterId. This clusterId is propagated to the cluster through replication of the metadata log and cached in meta.properties as soon as the value is known to be committed.

Brokers that are restarted will validate the clusterId through a process similar to what is done today. Before attempting to fetch from the current leader, a broker will send a Metadata request for the metadata topic (specified by KIP-631) to one of the voters. The response will indicate the current leader (if there is one) and the current clusterId (which will be null before the initial leader election). The broker will validate this result against the value cached in meta.properties.

The invariant that we are trying to enforce is that no follower may fetch from a leader which has an inconsistent clusterId. However, we also need to protect leader election itself since there may not be a leader at the time that the broker is started. Hence we have added the clusterId as a field to the Vote and BeginQuorumEpoch requests. A voter will reject a request with the INVALID_CLUSTER_ID error code if the requested clusterId does not match its own cached value. Note that the clusterId is only included in requests and validated if its value is known to have been committed. This protects from the case where the initial leader fails before its LeaderChange message can be committed to the log.

In summary, the initialization process for a voter will be the following:

  1. Send Metadata request to any of the current voters to find the current leader. 
    1. If a clusterId is received in a Metadata response, validate it against the current cached value if present. 
    2. If no clusterId is received and the broker has a non-null cached clusterId, then continue retrying the Metadata request until the election timeout expires.
  2. If the election timeout expires, become a candidate and send Vote requests including the current cached clusterId (or null if there is none).
    1. If the response indicates INVALID_CLUSTER_ID, the broker will not fail since the inconsistent broker might be in the minority. Instead it will continue retrying until the election completes.
  3. If at any time the broker receives a BeginQuorumEpoch request with an inconsistent clusterId, then the broker will terminate.

For an observer, it is even simpler. It would do the same validation in step 1, but in the case that no leader can be found, it would continue retrying indefinitely.

Tooling Support

We will add a new utility called kafka-metadata-quorum.sh to describe and alter quorum state. As usual, this tool will require --bootstrap-server to be provided.  We will support the following options:

...