THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
leaderElection(r: Replica)
read the current ISR and AR for r.partition.partition_id from ZK
if( (r in AR) && (ISR is empty || r in ISR) )
{
wait for PreferredReplicaTime if r is not the preferred replica
if(successfully write r as the current leader of r.partition in ZK)
becomeLeader(r, ISR, CUR)
else
becomeFollower(r)
}
}
|
State change events
On every broker
...
Code Block |
---|
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica]) { // get a new epoch value and write it to the leader path epoch = getNewEpoch() /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch stop HW checkpoint thread for r r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed r.partition.ISR = the current set of replicas in sync with r r.partition.CUR = AR - ISR write r.partition.ISR & r.partition.CUR in ZK r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK r.partition.leader = r // this enables reads/writes to this partition on this broker start a commit thread on r.partition start HW checkpoint thread for r get replica info from ZK and compute AR, a list of replicas assigned to this broker for each r in AR { if(r.broker_id is registered under /brokers/ids) sendStateChange(“start-replica”, r.broker_id, epoch) } } |
Admin commands
This section describes the algorithms for various admin commands like create/delete topic, add/remove partition.
...