THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
brokerStartup() { create /brokers/state/[broker_id] path if it doesn’t already exist register the state change handler to listen on child change ZK notifications on /brokers/state/[broker_id] register session expiration listener drain the state change queue get replica info from ZK and compute AR, a list of replicas assigned to this broker for each r in AR { subscribe to leader changes for the r’s partition startReplica(r) } // broker startup procedure is complete. Register is broker id in ZK to announce the availability of this broker register its broker_id in /brokers/ids/[broker_id] in ZK } |
State change events
On every broker
Leader change
This leader change listener is registered on every broker hosting a partition p. Each time it is triggered, the following procedure is executed -
Code Block |
---|
onLeaderChange()
{
if(broker_id is registered under /brokers/topics/[topic]/[partition_id]/replicas)
leaderElection(r)
}
|
On State change
Each broker has a ZK path that it listens to for state change requests from the leader
Code Block |
---|
stateChangeListener() {
// listens to state change requests issued by the leader and acts on those
drain the state change queue
read next state change request
Let r be the replica that the state change request is sent for.
// this should not happen
Throw an error if r is not hosted on this broker
Let requestEpoch be the epoch of the state change request
if(closeReplicaRequest)
{
closeReplica(r)
// we don’t need to check epoch here to be able to handle delete topic/delete partition for dead brokers.
}
if(startReplicaRequest)
{
Let latestPartitionEpoch be the latest epoch for this partition, got by reading /brokers/topics/[topic]/[partition]/leader=[broker_id],[epoch]
if(leader for r.partition doesn’t exist) {
// this can only happen for new topics or new partitions for existing topics
startReplica(r)
}else if(requestEpoch == latestPartitionEpoch) {
// this is to ensure that if a follower is slow, and reads a state change request queued up by a previous leader, it ignores the request
startReplica(r)
}
}
}
|
On the leader
On reassignment of partitions
Each time a partition reassigned event is triggered on the leader, it calls onPartitionReassigned()
Code Block |
---|
onPartitionsReassigned()
{
if(this broker is the leader for [partition_id])
{
p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
AR = /brokers/topics/[topic]/[partition_id]/replicas
newReplicas = p.RAR - AR
for(newReplica <- newReplicas)
sendStateChange(“start-replica”, newReplica.broker_id, epoch)
}
}
|