...
Code Block |
---|
brokerStartup()
{
create /brokers/state/[broker_id] path if it doesn’t already exist
register the state change handler to listen on child change ZK notifications on /brokers/state/[broker_id]
register session expiration listener
drain the state change queue
get replica info from ZK and compute AR, a list of replicas assigned to this broker
for each r in AR
{
subscribe to leader changes for the r’s partition
startReplica(r)
}
// broker startup procedure is complete. Register is broker id in ZK to announce the availability of this broker
register its broker_id in /brokers/ids/[broker_id] in ZK
}
|
State change events
On every broker
Leader change
...
Leader election
Code Block |
---|
onLeaderChangeleaderElection(r: Replica)
{
read if(broker_id is registered under /brokers/topics/[topic]/[partition_id]/replicas)
leaderElection(r)
}
|
On State change
Each broker has a ZK path that it listens to for state change requests from the leader
the current ISR and AR for r.partition.partition_id from ZK
if( (r in AR) && (ISR is empty || r in ISR) )
{
wait for PreferredReplicaTime if r is not the preferred replica
if(successfully write r as the current leader of r.partition in ZK)
becomeLeader(r, ISR, CUR)
}
}
|
State change events
On every broker
Leader change
This leader change listener is registered on every broker hosting a partition p. Each time it is triggered, the following procedure is executed -
Code Block |
---|
onLeaderChange()
{
if(broker_id is registered under /brokers/topics/[topic]/[partition_id]/replicas)
leaderElection(r)
}
|
On State change
Each broker has a ZK path that it listens to for state change requests from the leader
Code Block |
---|
stateChangeListener() {
// listens to state change requests issued by the leader and acts on those
drain the state change queue
read next state change request
Let r be the replica that the state change request is sent for.
// this should not happen
Throw an error if r is not hosted on this broker
Let requestEpoch be the epoch of the state change request
if(closeReplicaRequest)
{
closeReplica(r)
// we don’t need to check epoch here to be able to handle delete topic/delete partition for dead brokers.
}
if(startReplicaRequest)
{
Let latestPartitionEpoch be the latest epoch for this partition, got by reading /brokers/topics/[topic]/[partition]/leader=[broker_id],[epoch]
if(leader for r.partition doesn’t exist) {
// this can only happen for new topics or new partitions for existing topics
startReplica(r)
}else if(requestEpoch == latestPartitionEpoch) {
// this is to ensure that if a follower is slow, and reads a state change request queued up by a previous leader, it ignores the request
startReplica(r)
}
}
}
|
On the leader
On reassignment of partitions
Each time a partition reassigned event is triggered on the leader, it calls onPartitionReassigned()
Code Block |
---|
onPartitionsReassigned()
{
if(this broker is the leader for [partition_id])
{
p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
AR = /brokers/topics/[topic]/[partition_id]/replicas
newReplicas = p.RAR - AR
for(newReplica <- newReplicas)
sendStateChange(“start-replica”, newReplica.broker_id, epoch)
}
}
|
State change communication
The leader uses this API to communicate a state change request to the followers
Code Block |
---|
sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
// check if the state change Q is full. This can happen if a broker is offline for a long time
if(stateChangeQ.isFull) {
// this operation retains only one close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
stateChangeQ.shrink
// if the queue is still full, log an error
throw new FollowerStateChangeQueueFull
}
stateChangeQ.put(stateChangeRequest)
}
|
State change operations
Start replica
This state change is requested by the leader or the admin command for a new replica assignment
Code Block |
---|
startReplica(r: Replica) {
if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas)
throw NotReplicaForPartitionException()
if( r's log is not already started) {
do local recovery of r's log
r.hw = min(last checkpointed HW for r, r.leo)
register a leader-change listener on partition r.partition.partition_id
}
if( a leader does not exist for partition r.partition.partition_id in ZK)
leaderElection(r)
else {
if(this broker is not the leader, then it is a follower since it is in the AR list for this partition)
becomeFollower(r)
}
}
|
Close replica
This state change is requested by the leader when a topic or partition is deleted or moved to another broker
Code Block |
---|
closeReplica(r: Replica)
{
stop the fetcher associated with r, if one exists
close and delete r
}
|
Become follower
This state change is requested by the leader when the leader for a replica changes
Code Block |
---|
becomeFollower(r: Replica)
{
// this is required if this replica was the last leader
stop the commit thread, if any |
Code Block |
---|
stateChangeListener() {
// listens to state change requests issued by the leader and acts on those
drain the state change queue
read next state change request
Let r be the replica that the state change request is sent for.
// this should not happen
Throw an error if r is not hosted on this broker
Let requestEpoch be stop the epochcurrent of the state change request
if(closeReplicaRequest) ReplicaFetcherThread, if any
truncate the {log
to closeReplica(r).hw
start a new ReplicaFetcherThread //to wethe don’tcurrent needleader toof checkr, epochfrom here to be able to handle delete topic/delete partition for dead brokers.
}
if(startReplicaRequest)
{
Let latestPartitionEpoch be the latest epoch for this partition, got by readingoffset r.leo
start HW checkpoint thread for r
}
|
Become leader
This state change is done by the new leader
Code Block |
---|
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
// get a new epoch value and write it to the leader path
epoch = getNewEpoch()
/brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=[broker_id],[ epoch]
stop HW checkpoint thread if(leader for r.partition
doesn’t exist) {
r.hw = r.leo // TODO: check if this can only happenshould actually be r.hw = last checkpointed HW for newr
topics or newwait partitionsuntil forevery existinglive topics
replica in AR catches up (i.e. its leo == startReplica(r.hw) or a KeepInSyncTime has passed
r.partition.ISR =
the current set of replicas in sync }else if(requestEpoch == latestPartitionEpoch) {
// this is to ensure that if a follower is slow, and reads a state change request queued up by a previous leader, it ignores the request
startReplica(r)
}
}
}
|
On the leader
On reassignment of partitions
Each time a partition reassigned event is triggered on the leader, it calls onPartitionReassigned()
Code Block |
---|
onPartitionsReassigned()
{
if(this broker is the leader for [partition_id])
{
p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
AR = /brokers/topics/[topic]/[partition_id]/replicas
newReplicas = p.RAR - AR
for(newReplica <- newReplicas)
with r
r.partition.CUR = AR - ISR
write r.partition.ISR & r.partition.CUR in ZK
r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
r.partition.leader = r // this enables reads/writes to this partition on this broker
start a commit thread on r.partition
start HW checkpoint thread for r
get replica info from ZK and compute AR, a list of replicas assigned to this broker
for each r in AR {
if(r.broker_id is registered under /brokers/ids)
sendStateChange(“start-replica”, newReplicar.broker_id, epoch)
}
}
|
State change communication
The leader uses this API to communicate a state change request to the followers
Create topic
The admin commands does the following while creating a new topic
Code Block |
---|
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr)
{
if(!cleanFailedTopicCreationAttempt(topic))
{
error(“Topic topic exists with live partitions”) |
Code Block |
---|
sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
// check if the state change Q is full. This can happen if a broker is offline for a long time
if(stateChangeQ.isFull) {
//exit
this operation retains} only one
close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
stateChangeQ.shrink
// if the queue is still full, log an error
throw new FollowerStateChangeQueueFull
}
stateChangeQ.put(stateChangeRequest)
}
|
State change operations
Start replica
This state change is requested by the leader or the admin command for a new replica assignment
Code Block |
---|
startReplica(r: Replica) {
if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas)
throw NotReplicaForPartitionException()
if( r's log is not already started) {
do local recovery of r's log
r.hw = min(last checkpointed HW for r, r.leo)
register a leader-change listener on partition r.partition.partition_id
}
if( a leader does not exist for partition r.partition.partition_id in ZK)
leaderElection(r)
else {
if(this broker is not the leader, then it is a follower since it is in the AR list for this partition)
becomeFollower(r)
}
}
|
Close replica
This state change is requested by the leader when a topic or partition is deleted or moved to another broker
Code Block |
---|
closeReplica(r: Replica)
{
stop the fetcher associated with r, if one exists
close and delete r
}
|
Become follower
This state change is requested by the leader when the leader for a replica changes
Code Block |
---|
becomeFollower(r: Replica)
{
// this is required if this replica was the last leader
stop the commit thread, if any
stop the current ReplicaFetcherThread, if any
truncate the log to r.hw
start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
start HW checkpoint thread for r
}
|
Become leader
This state change is done by the new leader
if(replicaAssignmentStr == “”) {
// assignReplicas will always assign partitions only to online brokers
replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor)
}
// create topic path in ZK
create /brokers/topics/topic
for(partition <- replicaAssignment) {
addPartition(topic, partition.id, partition.replicas)
}
// report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{
register watch on state change path for each replica
In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true
}
cleanFailedTopicCreationAttempts(topic)
{
topicsForPartitionsReassignment = ls /brokers/partitions_reassigned
for(topic <- topicsForPartitionsReassignment)
{
partitionsCreated = ls /brokers/topics/topic
for(partition <- partitionsCreated)
{
if(/brokers/topics/topic/partition/replicas path exists)
{
error(“Cannot cleanup. Topic exists with live partition”)
return false
}
}
// partition paths can be safely deleted
for(partition <- partitionsCreated)
delete /brokers/topics/topic/partition
delete /brokers/topics/topic
}
}
|
Delete topic
Code Block |
---|
deleteTopic(topic)
{
partitionsForTopic = ls /brokers/topics/topic
for(partition <- partitionsForTopic) {
if(!deletePartition(topic, partition))
{
error(“Failed to delete partition for topic”)
exit
}
}
// delete topic path in ZK
delete /brokers/topics/topic
}
|
Add partition to existing topic
Code Block |
---|
addPartition(topic, partition, replicas)
{
// write the partitions reassigned path for this create topic command
/brokers/partitions_reassigned/topic/partition=replicas
// start replicas for this new partition
for(replica <- replicas)
sendStateChange(“start-replica”, replica.brokerId, -1)
// wait till state change request is consumed by all replicas
if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout))
{
error(“Failed to create topic partition partitionId for timeout ms”)
exit
}
// create partition paths in ZK
/brokers/topics/topic/partitionId/replicas=replicas
delete /brokers/partitions_reassigned/topic/partitionId
}
|
Remove partition for existing topic
Code Block |
---|
deletePartition(topic, partition)
{
// empty list for partition reassignment means delete partition
/brokers/partitions_reassigned/topic/partition=””
// wait till replica is closed by all replicas
if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout))
{
error(“Failed to delete topic after timeout ms”)
return false
}
// create partition paths in ZK
delete /brokers/topics/topic/partitionId
delete /brokers/partitions_reassigned/topic/partitionId |
Code Block |
---|
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
// get a new epoch value and write it to the leader path
epoch = getNewEpoch()
/brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
stop HW checkpoint thread for r
r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
r.partition.ISR = the current set of replicas in sync with r
r.partition.CUR = AR - ISR
write r.partition.ISR & r.partition.CUR in ZK
r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
r.partition.leader = r // this enables reads/writes to this partition on this broker
start a commit thread on r.partition
start HW checkpoint thread for r
get replica info from ZK and compute AR, a list of replicas assigned to this broker
for each r in AR {
if(r.broker_id is registered under /brokers/ids)
sendStateChange(“start-replica”, r.broker_id, epoch)
}
}
|