...
Paths stored in Zookeeper
Notation: When an element in a path is denoted \ [xyz\], that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/\[topic\] would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral. Wiki Markup
We store the following paths in Zookeeper:
...
- Partition-reassigned listener:
- child change on /brokers/partitions_reassignedunmigrated-wiki-markup
- child change on /brokers/partitions_reassigned/\[topic\]
Zookeeper listeners on all brokers
Leader-change listener: value change on /brokers/topics/\[topic\]/\[partition_id\]/leaderunmigrated-wiki-markupWiki Markup - State-change listener: child change on /brokers/state/\[broker_id\]
Configuration parameters
- LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
- KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.
...
Code Block |
---|
onPartitionsReassigned() { if(this broker is the leader for [partition_id]) { p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id] AR = /brokers/topics/[topic]/[partition_id]/replicas newReplicas = p.RAR - AR for(newReplica <- newReplicas) sendStateChange(“start-replica”, newReplica.broker_id, epoch) } } |
State change communication
The leader uses this API to communicate a state change request to the followers
if(p.RAR is empty)
{
for(assignedReplica <- AR)
sendStateChange("close-replica", assignedReplica.broker_id, epoch)
}
}
}
|
State change communication
The leader uses this API to communicate a state change request to the followers
Code Block |
---|
sendStateChange(stateChange, followerBrokerId, leaderEpoch)
|
Code Block |
sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
// check if the state change Q is full. This can happen if a broker is offline for a long time
if(stateChangeQ.isFull) {
// this operation retains only one close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
stateChangeQ.shrink
// if the queue is still full, log an error
throw new FollowerStateChangeQueueFull
}
stateChangeQ.put(stateChangeRequest)
}
|
...
Code Block |
---|
startReplica(r: Replica) { if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas) throw NotReplicaForPartitionException() if( r's log is not already started) { do local recovery of r's log r.hw = min(last checkpointed HW for r, r.leo) register a leader-change listener on partition r.partition.partition_id } if( a leader does not exist for partition r.partition.partition_id in ZK) leaderElection(r) else { if( //this broker is not the leader, then it is a follower since it is in the AR list for this partition if(this broker is not inalready the ARfollower listof forthe thiscurrent partitionleader) becomeFollower(r) } } |
Close replica
...
Code Block |
---|
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica]) { // get a new epoch value and write it to the leader path epoch = getNewEpoch() /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch /brokers/topics/[r.partition.topic]/[r.partition.pid]/ISR=ISR;epoch stop HW checkpoint thread for r r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed r.partition.ISR = the current set of replicas in sync with r r.partition.CUR = AR - ISR write r.partition.ISR & r.partition.CUR in ZK r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK r.partition.leader = r // this enables reads/writes to this partition on this broker start a commit thread on r.partition start HW checkpoint thread for r get replica info from ZK and compute AR, a list of replicas assigned to this broker } |
Admin commands
This section describes the algorithms for various admin commands like create/delete topic, add/remove partition.
...
Code Block |
---|
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr) { if(!cleanFailedTopicCreationAttempt(topic)) { error(“Topic topic exists with live partitions”) exit } if(replicaAssignmentStr == “”) { // assignReplicas will always assign partitions only to online brokers replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor) } // create topic path in ZK create /brokers/topics/topic for(partition <- replicaAssignment) { addPartition(topic, partition.id, partition.replicas) } // report successfully started partitions for this topic } waitTillStateChangeRequestConsumed(partition.replicas, timeout) { register watch on state change path for each replica In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true } cleanFailedTopicCreationAttempts(topic) { topicsForPartitionsReassignment = ls /brokers/partitions_reassigned for(topic <- topicsForPartitionsReassignment) { partitionsCreated = ls /brokers/partitions_reassigned/topic cleanupFailed = false for(partition <- partitionsCreated) { if(/brokers/topics/topic/partition/replicas path exists) { delete /brokers/partitions_reassigned/topic/partition error(“Cannot cleanup. Topic exists with live partition”) cleanupFailed = true } } if(cleanupFailed) { if(/brokers/partitions_reassigned/topic has no children) delete /brokers/partitions_reassigned/topic return false } // partition paths can be safely deleted for(partition <- partitionsCreated) { read the /brokers/partitions_reassigned/topic/partition path for each broker listed in the above step, sendStateChange(“close-replica”, [broker_id], -1) delete /brokers/topics/topic/partitions/partition delete /brokers/partitions_reassigned/topic/partition }reassigned/topic/partition } } if(/brokers/topics/topic has no children) } delete /brokers/topics/topic } |
...
Code Block |
---|
while(true) { pr = commitQ.dequeue canCommit = false while(!canCommit) { canCommit = true for each r in ISR if(!offsetReached(r, pr.offset)) { canCommit = false break } if(!canCommit) { p.CUR.add(r) p.ISR.delete(r) write p.ISR to ZK } } for each c in CUR if(c.leo >= pr.offset) { p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK } checkReassignedReplicas(pr, p.RAR, p.ISR) checkLoadBalancing() r.hw = pr.offset // increment the HW to indicate that pr is committed send ACK to the client that pr is committed } offsetReached(r: Replica, offset: Long) { if(r.leo becomes equal or larger than offset within KeepInSyncTime) return true return false } checkLoadBalancing() { // see if we need to switch the leader to the preferred replica if(leader replica is not the preferred one & the preferred replica is in ISR) { delete /brokers/topics/[topic]/[partition_id]/leader in ZK stop this commit thread stop the HW checkpoint thread } } checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica]) { // see if all reassigned replicas have fully caught up and older replicas have stopped fetching, if so, switch to those replicas // optimization, do the check periodically If (every replica in RAR has its leo >= pr.offset) { if(!sentCloseReplica.get) { oldReplicas = AR - RAR for(oldReplica <- oldReplicas) { if(r.broker_id != broker_id) sendStateChange(“close-replica”, oldReplica.broker_id, epoch) } sentCloseReplica.set(true) }else { // close replica is already sent. Wait until the replicas are closed or probably timeout and raise error if(broker_id is in (AR - RAR) && (other replicas in (AR - RAR) are not in ISR anymore)) { // leader is not in the reassigned replicas list completePartitionReassignment(RAR, ISR, AR, true) sentCloseReplica.set(false) } else if(every replica in (AR-RAR) is not in ISR anymore) { completePartitionReassignment(RAR, ISR, AR, false) sentCloseReplica.set(false) } } } completePartitionsReassignment(RAR: Set[Replica], ISR: Set[Replica], AR: Set[Replica], stopCommitThread: Boolean) { //newly assigned replicas are in-sync, switch over to the new replicas //need (RAR + ISR) in case we fail right after here write (RAR + ISR) as the new ISR in ZK update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR if(stopCommitThread || (broker_id is not preferred replica)) { update /brokers/topics/[topic]/[partition_id]/replicas in ZK withif(this broker_id is not in the new replicas in RAR //triggers leader election AR) sendStateChange(“close-replica”, broker_id, epoch) delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK if(stopCommitThread || (broker_id is not preferred replica)) { //triggers leader election delete /brokers/topics/[topic]/[partition_id]/leader in ZK stop this commit thread } } |
...