THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Table of Contents |
---|
This detailed design differs from the original detailed design in the following areas -
...
Code Block |
---|
while(true) {
pr = commitQ.dequeue
canCommit = false
while(!canCommit) {
canCommit = true
for each r in ISR
if(!offsetReached(r, pr.offset)) {
canCommit = false
break
}
if(!canCommit) {
p.CUR.add(r)
p.ISR.delete(r)
write p.ISR to ZK
}
}
for each c in CUR
if(c.leo >= pr.offset) {
p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK
}
checkReassignedReplicas(pr, p.RAR, p.ISR)
checkLoadBalancing()
r.hw = pr.offset // increment the HW to indicate that pr is committed
send ACK to the client that pr is committed
}
offsetReached(r: Replica, offset: Long) {
if(r.leo becomes equal or larger than offset within KeepInSyncTime) return true
return false
}
checkLoadBalancing() { // see if we need to switch the leader to the preferred replica
if(leader replica is not the preferred one & the preferred replica is in ISR) {
delete /brokers/topics/[topic]/[partition_id]/leader in ZK
stop this commit thread
stop the HW checkpoint thread
}
}
checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica])
{
// see if all reassigned replicas have fully caught up and older replicas have stopped fetching, if so, switch to those replicas
// optimization, do the check periodically
If (every replica in RAR has its leo >= pr.offset) {
if(!sentCloseReplica.get) {
oldReplicas = AR - RAR
for(oldReplica <- oldReplicas) {
if(r.broker_id != broker_id)
sendStateChange(“close-replica”, oldReplica.broker_id, epoch)
}
sentCloseReplica.set(true)
}else {
// close replica is already sent. Wait until the replicas are closed or probably timeout and raise error
if(broker_id is in (AR - RAR) && (other replicas in (AR - RAR) are not in ISR anymore)) {
// leader is not in the reassigned replicas list
completePartitionReassignment(RAR, ISR, AR, true)
sentCloseReplica.set(false)
}
else if(every replica in (AR-RAR) is not in ISR anymore) {
completePartitionReassignment(RAR, ISR, AR, false)
sentCloseReplica.set(false)
}
}
}
completePartitionsReassignment(RAR: Set[Replica], ISR: Set[Replica], AR: Set[Replica], stopCommitThread: Boolean)
{
//newly assigned replicas are in-sync, switch over to the new replicas
//need (RAR + ISR) in case we fail right after here
write (RAR + ISR) as the new ISR in ZK
update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR
//triggers leader election
delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
if(stopCommitThread || (broker_id is not preferred replica))
{
delete /brokers/topics/[topic]/[partition_id]/leader in ZK
stop this commit thread
}
}
|
Follower fetching from leader
A follower keeps sending ReplicaFetcherRequests to the leader. The process at the leader and the follower are described below -
Code Block |
---|
ReplicaFetchReqeust { topic: String partition_id: Int replica_id: Int offset: Long } ReplicaFetchResponse { hw: Long // the offset of the last message committed at the leader messages: MessageSet // fetched messages } |
At the leader
Code Block |
---|
replicaFetch (f: ReplicaFetchRequest) { // handler for ReplicaFetchRequest at leader
leader = getLeaderReplica(f.topic, f.partition_id)
if(leader == null) throw NotLeaderException
response = new ReplicaFetcherResponse
getReplica(f.topic, f.partition_id, f.replica_id).leo = f.offset
response.messages = fetch messages starting from f.offset from leader.log
response.hw = leader.hw
send response back
}
|
At the follower
ReplicaFetcherThread for Replica r:
Code Block |
---|
while(true) {
send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back
append response.messages to r's log
r.hw = response.hw
advance offset in ReplicaFetchRequest
}
|