Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added some more detail on clusterId generation and validation

...

Code Block
{
  "type": "message",
  "name": "QuorumStateMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
	      {"name": "BrokerIdClusterId", "type": "int32string", "versions": "0+"},
	      {"name": "LeaderId", "type": "int32", "versions": "0+"},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+"},
      {"name": "VotedId", "type": "int32", "versions": "0+"},
      {"name": "AppliedOffset", "type": "int64", "versions": "0+"},
      {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]},
      {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
  ]
}

...

Below we define the purpose of these fields:

  • BrokerId: we store this in order to detect conflicts if broker.id in the main configuration is changed unexpectedlyClusterId: the clusterId, which is persisted in the log and used to ensure that we do not mistakenly interact with the wrong cluster.
  • LeaderId: this is the last known leader of the quorum. A value of -1 indicates that there was is no leader.
  • LeaderEpoch: this is the last known leader epoch. This is initialized to 0 when the quorum is bootstrapped and should never be negative.
  • VotedId: indicates the id of the broker that this replica voted for in the current epoch. A value of -1 indicates that the replica has not (or cannot) vote.
  • AppliedOffset: Reflects the maximum offset that has been applied to this quorum state. This is used for log recovery. The broker must scan from this point on initialization to detect updates to this file.
  • CurrentVoters: the latest known set of voters for this quorum.
  • TargetVoters: the latest known target voters if the quorum is being reassigned.

The use of this file will be described in more detail below as we describe the protocol. Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness.

Leader Election and Data Replication

The key functionalities of any consensus protocol are leader election and data replication. The protocol for these two functionalities consists of 5 core RPCs:

Additionally, we make use of the current meta.properties file, which caches the value from broker.id in the configuration as well as the discovered clusterId. As is the case today, the configured broker.id does not match the cached value, then the broker will refuse to start. Similarly, the cached clusterId is included when interacting with the quorum APIs. If we connect to a cluster with a different clusterId, then the broker will receive a fatal error and shutdown.

Leader Election and Data Replication

The key functionalities of any consensus protocol are leader election and data replication. The protocol for these two functionalities consists of 5 core RPCs:

  • Vote: Sent by a voter to initiate an election.
  • BeginQuorumEpoch: Used by a new leader to inform the voters of its status.
  • Vote: Sent by a voter to initiate an election.
  • BeginQuorumEpoch: Used by a new leader to inform the voters of its status.
  • EndQuorumEpoch: Used by a leader to gracefully step down and allow a new election.
  • FetchQuorumRecords: Sent by voters and observers to the leader in order to replicate the log.
  • FindQuorum: Used to discover or view current quorum state when bootstrapping a broker.

...

Before getting into the details of these APIs, there are a few common attributes worth mentioning upfront:

  1. All The core requests have a field for the clusterId. For voters in the quorum, this must be provided initially as a static configuration. For observers, it can be discovered and cached as in KAFKA-7735. Its purpose is to prevent misconfigured brokers from connecting to the wrong This is validated when a request is received to ensure that misconfigured brokers will not cause any harm to the cluster.
  2. All requests save Vote have a field for the leader epoch. Voters and leaders are always expected to ensure that the request epoch is consistent with its own and return an error if it is not.
  3. We piggyback current leader and epoch information on all responses. This reduces latency to discover a leader change.

...

Code Block
{
  "type": "message",
  "name": "LeaderChangeMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
  	    {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the newly elected leader"},
      {"name": "VotedIds", "type": "[]int32", "versions": "0+",
       "about": "The IDs of the voters who voted for the current leader"},

  ]
}

...

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "FindQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
      {"name": "Voters", "type": "[]Voter", "versions": "0+",
       "about": "The current voters" },
      {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersion": "0+",
       "about": "The target voters if there is a reassignment in progress, null otherwise" },
  ],
  "commonStructs": [
    { "name": "Voter", "versions": "0+", "fields": [
        { "name": "VoterId", "type": "int32", "versions": "0+"},
        { "name": "BootTimestamp", "type": "int64", "versions": "0+"},
        { "name": "Host", "type": "string", "versions": "0+"},
        { "name": "Port", "type": "int32", "versions": "0+"},
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+"}
    }
  ]
}

...

When a broker receives a FindQuorum request, it will do the following:

  1. If the ReplicaId from  from the request is greater than 0 and matches one of the existing voters (or target voters), update local connection information for that node if either that node is not present in the cache or if the BootTimestamp from the request is larger than the cached value.
  2. Respond with the latest known leader and epoch along with the connection information of all known voters.

FindQuorum Response Handling

After receiving a FindQuorum response

Note on AdminClient Usage: This API is also used by the admin client in order to describe/alter quorum state. In this case, we will use a sentinel -1  for ReplicaId. The values for the other fields in the request will not be inspected by the broker if the ReplicaId is less than 0, so we can choose reasonable sentinels for them as well (e.g. Host can be left as an empty string).

FindQuorum Response Handling

After receiving a FindQuorum response

  1. First check the leader epoch in the response against the last known epoch. It is First check the leader epoch in the response against the last known epoch. It is possible that the node receives the latest epoch from another source before the FindQuorum returns, so we always have to check. If the leader epoch is less than the last known epoch, we ignore the response.
  2. The node will then update its local cache of connection information with the values from the response. As when handling requests, the node will only update cached connection information corresponding to the largest observed BootTimestamp.
  3. Next check if there is a leader defined in the response. If so, then the node will update quorum-state and become a follower.
  4. If there is no leader and the sender is one of the voters, then the node will become a candidate and attempt to find the other nodes in the quorum (if they are not known) in order to send VoteRequest to. If the sender is not a voter, then it will continue to try and discover the leader by sending FindQuorum to other known voters or to bootstrap.servers.

...

  1. Upon starting up, brokers always try to bootstrap its knowledge of the quorum by first reading the quorum-state file and then scanning forward from AppliedOffset to the end of the log to see if there are any changes to the quorum state. For newly started brokers, the log / file would all be empty so no previous knowledge can be restored.
  2. If after step 1), there's some known quorum state along with a leader / epoch already, the broker would:
    1. Promote itself from observer to voter if it finds out that it's a voter for the epoch.
    2. Start sending FetchQuorumRecords request to the current leader it knows (it may not be the latest epoch's leader actually).
  3. Otherwise, it will try to learn the quorum state by sending FindQuorum to any other brokers inside the cluster via boostrap.servers as the second option of quorum state discovery.
  4. If even step 3) cannot find any quorum information – e.g. when there's no other brokers in the cluster, or there's a network partition preventing this broker to talk to others in the cluster – fallback to the third option of quorum state discover by checking if it is among the brokers listed in quorum.voters.
    1. If so, then it will promote to voter state and add its own connection information to the cached quorum state and return that in the FindQuorum responses it answers to other brokers; otherwise stays in observer state.
    2. In either case, it continues to try to send FindQuorum to all other brokers in the cluster via boostrap.servers.
  5. For any voter, after it has learned a majority number of voters in the expected quorum from FindQuorum responses, it will begin a vote.After the first leader is elected, it will write the voter state to the log as the first entry as described in the section on VoteRequest handling.number of voters in the expected quorum from FindQuorum responses, it will begin a vote.

When a leader has been elected for the first time in a cluster (i.e. if the leader's log is empty), the first thing it will do is append a VoterAssignmentMessage (described in more detail below) which will contain quorum.voters as the initial CurrentVoters. Once this message has been persisted in the log, then we no longer need `quorum.voters` and users can safely roll the cluster without this config.

ClusterId generation: Note that the first leader is also responsible for providing the ClusterId field which is part of the VoterAssignmentMessage. If the cluster is being migrated from Zookeeper, then we expect to reuse the existing clusterId. If the cluster is starting for the first time, then a new one will be generated. Once this message has been committed to the log, the leader and all future leaders will strictly validate that this value matches the ClusterId provided in requests when receiving Vote, BeginEpoch, EndEpoch, and FetchQuorumRecords requests. 

The leader will also append a LeaderChangeMessage as described in the VoteResponse handling above. This is not needed for correctness. It is just useful for debugging and to ensure that the high watermark always has an opportunity to advance after an election.

Bootstrapping Example

With this procedure in mind, a convenient way to initialize a cluster might be the following.

...

Code Block
{
  "type": "message",
  "name": "AlterQuorumMessageVoterAssignmentMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"}
      {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]},
      {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
  ]
}

...

The effect of AlterQuorum is to change the TargetVoters field in the AlterQuorumMessage VoterAssignmentMessage defined above. Once this is done, the leader will begin the process of bringing the new nodes into the quorum and kicking out the nodes which are no longer needed.

...

  1. If the target node is not the leader of the quorum, return NOT_QUORUM_LEADER code to the admin client. 
  2. If the included voter is unknown to the quorum leader, return UNKNOWN_VOTER_ID code to the admin client.
  3. Otherwise, if the TargetVoters in the request does not match the current TargetVoters, then append a new AlterQuorumMessage VoterAssignmentMessage to the log and wait for it to be committed. 
  4. Return successfully only when the desired TargetVoters has been safely committed to the log.

After returning the result of the request, the leader will begin the process to modify the quorum. This is described in more detail below, but basically the leader will begin tracking the fetch state of the target voters. It will then make a sequence of single-movement alterations by appending new AlterQuorumMessage VoterAssignmentMessage records to the log.

AlterQuorum Response Handling

...

Upon receiving an AlterQuorum request, the leader will do the following:

  1. Append an AlterQuorumMessage VoterAssignmentMessage to the log with the current voters as CurrentVoters and the TargetVoters from the AlterQuorum request.
  2. Leader will compute 3 sets based on CurrentVoters and TargetVoters:
    1. RemovingVoters: voters to be removed
    2. RetainedVoters: voters shared between current and target
    3. NewVoters: voters to be added
  3. Based on comparison between size(NewVoters) and size(RemovingVoters),
    1. If size(NewVoters) >= size(RemovingVoters), pick one of NewVoters as NV by writing a record with CurrentVoters=CurrentVoters + NV, and TargetVoters=TargetVoters.
    2. else pick one of RemovingVoters as RV, preferably a non-leader voter, by writing a record with CurrentVoters=CurrentVoters - RV, and TargetVoters=TargetVoters.
  4. Once the record is committed, the membership change is safe to be applied. Note that followers will begin acting with the new voter information as soon as the log entry has been appended. They do not wait for it to be committed.
  5. As there is a potential delay for propagating the removal message to the removing voter, we piggy-back on the `FetchQuorumRecords` to inform the voter to downgrade immediately after the new membership gets committed. See the error code NOT_FOLLOWER.
  6. The leader will continue this process until one of the following scenarios happens:
    1. If TargetVoters = CurrentVoters, then the reassignment is done. The leader will append a new entry with TargetVoters=null to the log.
    2. If the leader is the last remaining node in RemovingVoters, then it will step down by sending EndQuorumEpoch to the current voters. It will continue as a voter until the next leader removes it from the quorum.

Note that there is one main subtlety with this process. When a follower receives the new quorum state, it immediately begins acting with the new state in mind. Specifically, if the follower becomes a candidate, it will expect votes from a majority of the new voters specified by the reassignment. However, it is possible that the AlterQuorumMessage VoterAssignmentMessage gets truncated from the follower's log because a newly elected leader did not have it in its log. In this case, the follower needs to be able to revert to the previous quorum state. To make this simple, voters will only persist quorum state changes in quorum-state after they have been committed. Upon initialization, any uncommitted state changes will be found by scanning forward from the LastOffset indicated in the quorum-state.

In a similar vein, if the AlterQuorumMessage VoterAssignmentMessage fails to be copied to all voters before a leader failure, then there could be temporary disagreement about voter membership. Each voter must act on the information they have in their own log when deciding whether to grant votes. It is possible in this case for a voter to receive a request from a non-voter (according to its own information). Voters must reject votes from non-voters, but that does not mean that the non-voter cannot ultimately win the election. Hence when a voter receives a VoteRequest from a non-voter, it must then become a candidate.

...