...
- Stores the information of all live brokers.
Code Block /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin)
- Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
Code Block /brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …} (created by admin)
- Stores the id of the replica that’s the current leader of this partition
Code Block /brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral) (created by leader)
- Stores the id of the set of replicas that are in-sync with the leader
Code Block /brokers/topics/[topic]/[partition_id]/ISR -->{broker_id, …}(created by leader)
- This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
Code Block /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin)
- This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica, become follower. This path is created during by the create topic add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a replica is temporarily unavailable (for example, being bounced).
Code Block /brokers/topics/[topic]/[partition_id]/state/[broker_id] --> { state change requests ... } (created by admin)
...
- Replica-change listener:
- child change on /brokers/topics (new topic registered)
Wiki Markup child change on /brokers/topics/\[topic\] (new partition registered)
Wiki Markup value change on /brokers/topics/\[topic\]/\[partition_id\]/replicas (new replica assigned)
- Partition-reassigned listener:
- child change on /brokers/partitions_reassigned
Wiki Markup child change on /brokers/partitions_reassigned/\[topic\]
Zookeeper listeners on all brokers
Wiki Markup Leader-change listener: value change on /brokers/topics/\[topic\]/\[partition_id\]/leader
/topics/topic/partition_idWiki Markup State-change listener: child change on /brokers
/state/\[broker_id\]
Configuration parameters
- LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
- KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.
...
Code Block |
---|
brokerStartup() { register its broker_id in /brokers/ids/[broker_id] in ZK get replica info from ZK and compute AR, a list of replicas assigned to this broker for each r in AR { register its broker_id in stateChangeListener() on /brokers/topics/[topic]/[partition_id]/state/[broker_id] in ZK register stateChangeListener() on /brokers/topics/[topic]/[partition_id]/state/[broker_id]startReplica() } } |
State change listener
Each broker has a ZK path that it listens to for state change requests from the leader
Code Block |
---|
stateChangeListener(r) { // listens to state change requests issued by the leader and acts on those read next state change request if(becomeFollowerRequest)Let r be becomeFollower(r) the replica that the if(replicaUnassignedRequest)state change closeReplica(r) request is sent if(deleteReplicaRequest)for. deleteReplica(r) Throw an error if(replicaAssignedRequest) replicaStateChange(r) } |
State change events
On leader change
This state change listener is registered on every broker hosting partition p. Each time it is triggered, the following procedure is executed -
Code Block |
---|
onLeaderChange()
{
if(r is registered under /brokers/topics/[topic]/[partition_id]/replicas)
leaderElection()
else
// some other replica is already the leader, and will send the become follower state change request to this replica
}
|
On replica change
Every time a replica change event is triggered, the leader calls onReplicaChange()
r is not hosted on this broker // this should not happen
if(becomeFollowerRequest) becomeFollower(r)
if(closeReplicaRequest) closeReplica(r)
if(startReplicaRequest) startReplica(r)
}
|
State change events
On leader change
This state change listener is registered on every broker hosting partition p. Each time it is triggered, the following procedure is executed -
Code Block |
---|
onLeaderChange |
Code Block |
onReplicaChange() { for newif(r replicais assignedregistered to follower Runder /brokers/topics/[topic]/[partition_id]/replicas) send start replicaleaderElection() state change requestelse to R for// replicasome rother un-assignedreplica fromis followeralready R the leader, and will send the closebecome replicafollower state change request to Rthis replica } |
On
...
replica change
Every Each time a partition reassigned replica change event is triggered, the leader calls onPartitionReassignedonReplicaChange()
Code Block |
---|
onPartitionsReassignedonReplicaChange() { p.RARfor =new thereplica newassigned replicasto from /brokers/partitions_reassigned/[topic]/[partition_id]follower R if(a new send start replica isstate change request to be reassigned toR for replica r un-assigned from follower R) send sendclose start-replica state change request to R } |
State change operations
Replica state change
On reassignment of partitions
Each time a partition reassigned event is triggered, the leader calls onPartitionReassigned() This state change is requested by the leader for a new replica assignment
Code Block |
---|
replicaStateChangeonPartitionsReassigned(r: Replica) { if( r's log is not already started) { do local recovery of r's logp.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id] if(a new replica is to be reassigned to follower R) send start-replica state change request to R } |
State change operations
Replica state change
This state change is requested by the leader for a new replica assignment
Code Block |
---|
replicaStateChange(r: Replica) { if( r's log is not already started) { do local recovery of r's log r.hw = min(last checkpointed HW for r, r.leo) register a leader-change listener on partition r.partition.partitionr.hw = min(last checkpointed HW for r, r.leo) register a leader-change listener on partition r.partition.partition_id } if( a leader does not exist for partition r.partition.partition_id in ZK) leaderElection(r) } |
...
Code Block |
---|
becomeFollower(r: Replica) { stop the currentcommit ReplicaFetcherThreadthread, if any truncate the log to r.hw start a new ReplicaFetcherThread to the current // this is required if this replica was the last leader stop the current ReplicaFetcherThread, if any truncate the log to r.hw start a new ReplicaFetcherThread to the current leader of r, from offset r.leo start HW checkpoint thread for r } |
...
HW checkpoint thread for partition p
Code Block |
---|
while(true) { r = p.leader flush rflush r.hw & r.log to disk } |
Committer thread for partition p
...
Code Block |
---|
replicaFetch (f: ReplicaFetchRequest) { // handler for ReplicaFetchRequest at leader leader = getLeaderReplica(f.topic, f.partition_id) if(leader == null) throw NotLeaderException response = new ReplicaFetcherResponse getReplica(f.topic, f.partition_id, f.replica_id).leo = f.offset response.messages = fetch messages starting from f.offset from leader.log response.hw = leader.hw send response back } |
At the follower
Code Block |
---|
ReplicaFetcherThread for Replica r:
while(true) {
send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back
append response.messages to r's log
r.hw = response.hw
advance offset in ReplicaFetchRequest
}
|
...
Code Block |
---|
ReplicaFetcherThread for Replica r:
while(true) {
send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back
append response.messages to r's log
r.hw = response.hw
advance offset in ReplicaFetchRequest
}
|
Leader change event
Each broker registers a leader-change listener in ZK for every replica assigned to it. Everytime a leader-change event is triggered, the broker calls onLeaderChange()
Code Block |
---|
onLeaderChange()
{
if(no leader exists under /brokers/topics/[topic]/[partition_id]/leader)
{
if(r is still registered under /brokers/topics/[topic]/[partition_id]/replicas)
leaderElection(r)
}
}
|