THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
onPartitionsReassigned() { if(this broker is the leader for [partition_id]) { p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id] AR = /brokers/topics/[topic]/[partition_id]/replicas newReplicas = p.RAR - AR for(newReplica <- newReplicas) sendStateChange(“start-replica”, newReplica.broker_id, epoch) } } |
State change communication
The leader uses this API to communicate a state change request to the followers
Code Block |
---|
sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
// check if the state change Q is full. This can happen if a broker is offline for a long time
if(stateChangeQ.isFull) {
// this operation retains only one close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
stateChangeQ.shrink
// if the queue is still full, log an error
throw new FollowerStateChangeQueueFull
}
stateChangeQ.put(stateChangeRequest)
}
|
State change operations
Start replica
This state change is requested by the leader or the admin command for a new replica assignment
Code Block |
---|
startReplica(r: Replica) {
if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas)
throw NotReplicaForPartitionException()
if( r's log is not already started) {
do local recovery of r's log
r.hw = min(last checkpointed HW for r, r.leo)
register a leader-change listener on partition r.partition.partition_id
}
if( a leader does not exist for partition r.partition.partition_id in ZK)
leaderElection(r)
else {
if(this broker is not the leader, then it is a follower since it is in the AR list for this partition)
becomeFollower(r)
}
}
|
Close replica
This state change is requested by the leader when a topic or partition is deleted or moved to another broker
Code Block |
---|
closeReplica(r: Replica)
{
stop the fetcher associated with r, if one exists
close and delete r
}
|
Become follower
This state change is requested by the leader when the leader for a replica changes
Code Block |
---|
becomeFollower(r: Replica)
{
// this is required if this replica was the last leader
stop the commit thread, if any
stop the current ReplicaFetcherThread, if any
truncate the log to r.hw
start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
start HW checkpoint thread for r
}
|
Become leader
This state change is done by the new leader
Code Block |
---|
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
// get a new epoch value and write it to the leader path
epoch = getNewEpoch()
/brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
stop HW checkpoint thread for r
r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
r.partition.ISR = the current set of replicas in sync with r
r.partition.CUR = AR - ISR
write r.partition.ISR & r.partition.CUR in ZK
r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
r.partition.leader = r // this enables reads/writes to this partition on this broker
start a commit thread on r.partition
start HW checkpoint thread for r
get replica info from ZK and compute AR, a list of replicas assigned to this broker
for each r in AR {
if(r.broker_id is registered under /brokers/ids)
sendStateChange(“start-replica”, r.broker_id, epoch)
}
}
|