...
Code Block |
---|
{ "type": "message", "name": "QuorumStateMessage", "validVersions": "0", "flexibleVersions": "0+", "fields": [ {"name": "BrokerIdClusterId", "type": "int32string", "versions": "0+"}, {"name": "LeaderId", "type": "int32", "versions": "0+"}, {"name": "LeaderEpoch", "type": "int32", "versions": "0+"}, {"name": "VotedId", "type": "int32", "versions": "0+"}, {"name": "AppliedOffset", "type": "int64", "versions": "0+"}, {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "fields": [ {"name": "VoterId", "type": "int32", "versions": "0+"} ]}, {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+", "fields": [ {"name": "VoterId", "type": "int32", "versions": "0+"} ]} ] } |
...
Below we define the purpose of these fields:
BrokerId
: we store this in order to detect conflicts ifbroker.id
in the main configuration is changed unexpectedlyClusterId:
the clusterId, which is persisted in the log and used to ensure that we do not mistakenly interact with the wrong cluster.LeaderId
: this is the last known leader of the quorum. A value of -1 indicates that there was is no leader.LeaderEpoch
: this is the last known leader epoch. This is initialized to 0 when the quorum is bootstrapped and should never be negative.VotedId
: indicates the id of the broker that this replica voted for in the current epoch. A value of -1 indicates that the replica has not (or cannot) vote.AppliedOffset
: Reflects the maximum offset that has been applied to this quorum state. This is used for log recovery. The broker must scan from this point on initialization to detect updates to this file.CurrentVoters
: the latest known set of voters for this quorum.TargetVoters
: the latest known target voters if the quorum is being reassigned.
The use of this file will be described in more detail below as we describe the protocol. Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness.
Leader Election and Data Replication
The key functionalities of any consensus protocol are leader election and data replication. The protocol for these two functionalities consists of 5 core RPCs:
Additionally, we make use of the current meta.properties
file, which caches the value from broker.id
in the configuration as well as the discovered clusterId. As is the case today, the configured broker.id
does not match the cached value, then the broker will refuse to start. Similarly, the cached clusterId is included when interacting with the quorum APIs. If we connect to a cluster with a different clusterId, then the broker will receive a fatal error and shutdown.
Leader Election and Data Replication
The key functionalities of any consensus protocol are leader election and data replication. The protocol for these two functionalities consists of 5 core RPCs:
- Vote: Sent by a voter to initiate an election.
- BeginQuorumEpoch: Used by a new leader to inform the voters of its status.
- Vote: Sent by a voter to initiate an election.
- BeginQuorumEpoch: Used by a new leader to inform the voters of its status.
- EndQuorumEpoch: Used by a leader to gracefully step down and allow a new election.
- FetchQuorumRecords: Sent by voters and observers to the leader in order to replicate the log.
- FindQuorum: Used to discover or view current quorum state when bootstrapping a broker.
...
Before getting into the details of these APIs, there are a few common attributes worth mentioning upfront:
- All The core requests have a field for the clusterId. For voters in the quorum, this must be provided initially as a static configuration. For observers, it can be discovered and cached as in KAFKA-7735. Its purpose is to prevent misconfigured brokers from connecting to the wrong This is validated when a request is received to ensure that misconfigured brokers will not cause any harm to the cluster.
- All requests save
Vote
have a field for the leader epoch. Voters and leaders are always expected to ensure that the request epoch is consistent with its own and return an error if it is not. - We piggyback current leader and epoch information on all responses. This reduces latency to discover a leader change.
...
Code Block |
---|
{ "type": "message", "name": "LeaderChangeMessage", "validVersions": "0", "flexibleVersions": "0+", "fields": [ {"name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the newly elected leader"}, {"name": "VotedIds", "type": "[]int32", "versions": "0+", "about": "The IDs of the voters who voted for the current leader"}, ] } |
...
Code Block |
---|
{ "apiKey": N, "type": "response", "name": "FindQuorumResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ {"name": "ErrorCode", "type": "int16", "versions": "0+"}, {"name": "ClusterId", "type": "string", "versions": "0+"}, {"name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown."}, {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"} {"name": "Voters", "type": "[]Voter", "versions": "0+", "about": "The current voters" }, {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersion": "0+", "about": "The target voters if there is a reassignment in progress, null otherwise" }, ], "commonStructs": [ { "name": "Voter", "versions": "0+", "fields": [ { "name": "VoterId", "type": "int32", "versions": "0+"}, { "name": "BootTimestamp", "type": "int64", "versions": "0+"}, { "name": "Host", "type": "string", "versions": "0+"}, { "name": "Port", "type": "int32", "versions": "0+"}, { "name": "SecurityProtocol", "type": "int16", "versions": "0+"} } ] } |
...
When a broker receives a FindQuorum
request, it will do the following:
- If the
ReplicaId
from from the request is greater than 0 and matches one of the existing voters (or target voters), update local connection information for that node if either that node is not present in the cache or if theBootTimestamp
from the request is larger than the cached value. - Respond with the latest known leader and epoch along with the connection information of all known voters.
FindQuorum Response Handling
After receiving a FindQuorum
response
Note on AdminClient Usage: This API is also used by the admin client in order to describe/alter quorum state. In this case, we will use a sentinel -1 for ReplicaId
. The values for the other fields in the request will not be inspected by the broker if the ReplicaId
is less than 0, so we can choose reasonable sentinels for them as well (e.g. Host
can be left as an empty string).
FindQuorum Response Handling
After receiving a FindQuorum
response
- First check the leader epoch in the response against the last known epoch. It is First check the leader epoch in the response against the last known epoch. It is possible that the node receives the latest epoch from another source before the
FindQuorum
returns, so we always have to check. If the leader epoch is less than the last known epoch, we ignore the response. - The node will then update its local cache of connection information with the values from the response. As when handling requests, the node will only update cached connection information corresponding to the largest observed
BootTimestamp
. - Next check if there is a leader defined in the response. If so, then the node will update
quorum-state
and become a follower. - If there is no leader and the sender is one of the voters, then the node will become a candidate and attempt to find the other nodes in the quorum (if they are not known) in order to send
VoteRequest
to. If the sender is not a voter, then it will continue to try and discover the leader by sendingFindQuorum
to other known voters or tobootstrap.servers
.
...
- Upon starting up, brokers always try to bootstrap its knowledge of the quorum by first reading the
quorum-state
file and then scanning forward fromAppliedOffset
to the end of the log to see if there are any changes to the quorum state. For newly started brokers, the log / file would all be empty so no previous knowledge can be restored. - If after step 1), there's some known quorum state along with a leader / epoch already, the broker would:
- Promote itself from observer to voter if it finds out that it's a voter for the epoch.
- Start sending
FetchQuorumRecords
request to the current leader it knows (it may not be the latest epoch's leader actually).
- Otherwise, it will try to learn the quorum state by sending
FindQuorum
to any other brokers inside the cluster viaboostrap.servers
as the second option of quorum state discovery. - If even step 3) cannot find any quorum information – e.g. when there's no other brokers in the cluster, or there's a network partition preventing this broker to talk to others in the cluster – fallback to the third option of quorum state discover by checking if it is among the brokers listed in
quorum.voters
.- If so, then it will promote to voter state and add its own connection information to the cached quorum state and return that in the
FindQuorum
responses it answers to other brokers; otherwise stays in observer state. - In either case, it continues to try to send
FindQuorum
to all other brokers in the cluster viaboostrap.servers.
- If so, then it will promote to voter state and add its own connection information to the cached quorum state and return that in the
- For any voter, after it has learned a majority number of voters in the expected quorum from
FindQuorum
responses, it will begin a vote.After the first leader is elected, it will write the voter state to the log as the first entry as described in the section onVoteRequest
handling.number of voters in the expected quorum fromFindQuorum
responses, it will begin a vote.
When a leader has been elected for the first time in a cluster (i.e. if the leader's log is empty), the first thing it will do is append a VoterAssignmentMessage
(described in more detail below) which will contain quorum.voters
as the initial CurrentVoters
. Once this message has been persisted in the log, then we no longer need `quorum.voters` and users can safely roll the cluster without this config.
ClusterId generation: Note that the first leader is also responsible for providing the ClusterId
field which is part of the VoterAssignmentMessage
. If the cluster is being migrated from Zookeeper, then we expect to reuse the existing clusterId. If the cluster is starting for the first time, then a new one will be generated. Once this message has been committed to the log, the leader and all future leaders will strictly validate that this value matches the ClusterId
provided in requests when receiving Vote
, BeginEpoch
, EndEpoch
, and FetchQuorumRecords
requests.
The leader will also append a LeaderChangeMessage
as described in the VoteResponse
handling above. This is not needed for correctness. It is just useful for debugging and to ensure that the high watermark always has an opportunity to advance after an election.
Bootstrapping Example
With this procedure in mind, a convenient way to initialize a cluster might be the following.
...
Code Block |
---|
{ "type": "message", "name": "AlterQuorumMessageVoterAssignmentMessage", "validVersions": "0", "flexibleVersions": "0+", "fields": [ {"name": "ClusterId", "type": "string", "versions": "0+"} {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "fields": [ {"name": "VoterId", "type": "int32", "versions": "0+"} ]}, {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+", "fields": [ {"name": "VoterId", "type": "int32", "versions": "0+"} ]} ] } |
...
The effect of AlterQuorum
is to change the TargetVoters
field in the AlterQuorumMessage
VoterAssignmentMessage
defined above. Once this is done, the leader will begin the process of bringing the new nodes into the quorum and kicking out the nodes which are no longer needed.
...
- If the target node is not the leader of the quorum, return NOT_QUORUM_LEADER code to the admin client.
- If the included voter is unknown to the quorum leader, return UNKNOWN_VOTER_ID code to the admin client.
- Otherwise, if the
TargetVoters
in the request does not match the currentTargetVoters
, then append a newAlterQuorumMessage
VoterAssignmentMessage
to the log and wait for it to be committed. - Return successfully only when the desired
TargetVoters
has been safely committed to the log.
After returning the result of the request, the leader will begin the process to modify the quorum. This is described in more detail below, but basically the leader will begin tracking the fetch state of the target voters. It will then make a sequence of single-movement alterations by appending new AlterQuorumMessage
VoterAssignmentMessage
records to the log.
AlterQuorum Response Handling
...
Upon receiving an AlterQuorum
request, the leader will do the following:
- Append an
AlterQuorumMessage
VoterAssignmentMessage
to the log with the current voters asCurrentVoters
and theTargetVoters
from theAlterQuorum
request. - Leader will compute 3 sets based on
CurrentVoters
andTargetVoters
:RemovingVoters
: voters to be removedRetainedVoters
: voters shared between current and targetNewVoters
: voters to be added
- Based on comparison between
size(NewVoters)
andsize(RemovingVoters)
,- If
size(NewVoters)
>=size(RemovingVoters
), pick one ofNewVoters
asNV
by writing a record withCurrentVoters=CurrentVoters + NV
, andTargetVoters=TargetVoters
. - else pick one of
RemovingVoters
asRV
, preferably a non-leader voter, by writing a record withCurrentVoters=CurrentVoters - RV
, andTargetVoters=TargetVoters
.
- If
- Once the record is committed, the membership change is safe to be applied. Note that followers will begin acting with the new voter information as soon as the log entry has been appended. They do not wait for it to be committed.
- As there is a potential delay for propagating the removal message to the removing voter, we piggy-back on the `FetchQuorumRecords` to inform the voter to downgrade immediately after the new membership gets committed. See the error code NOT_FOLLOWER.
- The leader will continue this process until one of the following scenarios happens:
- If
TargetVoters = CurrentVoters
, then the reassignment is done. The leader will append a new entry withTargetVoters=null
to the log. - If the leader is the last remaining node in
RemovingVoters
, then it will step down by sendingEndQuorumEpoch
to the current voters. It will continue as a voter until the next leader removes it from the quorum.
- If
Note that there is one main subtlety with this process. When a follower receives the new quorum state, it immediately begins acting with the new state in mind. Specifically, if the follower becomes a candidate, it will expect votes from a majority of the new voters specified by the reassignment. However, it is possible that the AlterQuorumMessage
VoterAssignmentMessage
gets truncated from the follower's log because a newly elected leader did not have it in its log. In this case, the follower needs to be able to revert to the previous quorum state. To make this simple, voters will only persist quorum state changes in quorum-state
after they have been committed. Upon initialization, any uncommitted state changes will be found by scanning forward from the LastOffset
indicated in the quorum-state
.
In a similar vein, if the AlterQuorumMessage
VoterAssignmentMessage
fails to be copied to all voters before a leader failure, then there could be temporary disagreement about voter membership. Each voter must act on the information they have in their own log when deciding whether to grant votes. It is possible in this case for a voter to receive a request from a non-voter (according to its own information). Voters must reject votes from non-voters, but that does not mean that the non-voter cannot ultimately win the election. Hence when a voter receives a VoteRequest
from a non-voter, it must then become a candidate.
...