THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
while(true) { pr = commitQ.dequeue canCommit = false while(!canCommit) { canCommit = true for each r in ISR if(!offsetReached(r, pr.offset)) { canCommit = false break } if(!canCommit) { p.CUR.add(r) p.ISR.delete(r) write p.ISR to ZK } } for each c in CUR if(c.leo >= pr.offset) { p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK } checkReassignedReplicas(pr, p.RAR, p.ISR) checkLoadBalancing() r.hw = pr.offset // increment the HW to indicate that pr is committed send ACK to the client that pr is committed } offsetReached(r: Replica, offset: Long) { if(r.leo becomes equal or larger than offset within KeepInSyncTime) return true return false } checkLoadBalancing() { // see if we need to switch the leader to the preferred replica if(leader replica is not the preferred one & the preferred replica is in ISR) { delete /brokers/topics/[topic]/[partition_id]/leader in ZK stop this commit thread stop the HW checkpoint thread } } checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica]) { // see if all reassigned replicas have fully caught up and older replicas have stopped fetching, if so, switch to those replicas // optimization, do the check periodically If (every replica in RAR has its leo >= pr.offset) { if(!sentCloseReplica.get) { oldReplicas = AR - RAR for(oldReplica <- oldReplicas) { if(r.broker_id != broker_id) sendStateChange(“close-replica”, oldReplica.broker_id, epoch) } sentCloseReplica.set(true) }else { // close replica is already sent. Wait until the replicas are closed or probably timeout and raise error if(broker_id is in (AR - RAR) && (other replicas in (AR - RAR) are not in ISR anymore)) { // leader is not in the reassigned replicas list completePartitionReassignment(RAR, ISR, AR, true) sentCloseReplica.set(false) } else if(every replica in (AR-RAR) is not in ISR anymore) { completePartitionReassignment(RAR, ISR, AR, false) sentCloseReplica.set(false) } } } completePartitionsReassignment(RAR: Set[Replica], ISR: Set[Replica], AR: Set[Replica], stopCommitThread: Boolean) { //newly assigned replicas are in-sync, switch over to the new replicas //need (RAR + ISR) in case we fail right after here write (RAR + ISR) as the new ISR in ZK update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR //triggers leader election if(stopCommitThread || (broker_id is not preferred replica)) { if(this broker_id is not in the new AR) sendStateChange(“close-replica”, broker_id, epoch) delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK if(stopCommitThread || (broker_id is not preferred replica)) { //triggers leader election delete /brokers/topics/[topic]/[partition_id]/leader in ZK stop this commit thread } } |
...