...
Code Block |
---|
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
// get a new epoch value and write it to the leader path
epoch = getNewEpoch()
/brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
stop HW checkpoint thread for r
r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
r.partition.ISR = the current set of replicas in sync with r
r.partition.CUR = AR - ISR
write r.partition.ISR & r.partition.CUR in ZK
r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
r.partition.leader = r // this enables reads/writes to this partition on this broker
start a commit thread on r.partition
start HW checkpoint thread for r
get replica info from ZK and compute AR, a list of replicas assigned to this broker
for each r in AR {
if(r.broker_id is registered under /brokers/ids)
sendStateChange(“start-replica”, r.broker_id, epoch)
}
}
|
Admin commands
This section describes the algorithms for various admin commands like create/delete topic, add/remove partition.
Create topic
The admin commands does the following while creating a new topic
Code Block |
---|
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr)
{
if(!cleanFailedTopicCreationAttempt(topic))
{
error(“Topic topic exists with live partitions”)
exit
}
if(replicaAssignmentStr == “”) {
// assignReplicas will always assign partitions only to online brokers
replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor)
}
// create topic path in ZK
create /brokers/topics/topic
for(partition <- replicaAssignment) {
addPartition(topic, partition.id, partition.replicas)
}
// report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{
register watch on state change path for each replica
In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true
}
cleanFailedTopicCreationAttempts(topic)
{
topicsForPartitionsReassignment = ls /brokers/partitions_reassigned
for(topic <- topicsForPartitionsReassignment)
{
partitionsCreated = ls /brokers/topics/topic
for(partition <- partitionsCreated)
{
if(/brokers/topics/topic/partition/replicas path exists)
{
error(“Cannot cleanup. Topic exists with live partition”)
return false
}
}
// partition paths can be safely deleted
for(partition <- partitionsCreated)
delete /brokers/topics/topic/partition
delete /brokers/topics/topic
}
}
|
Delete topic
Code Block |
---|
deleteTopic(topic)
{
partitionsForTopic = ls /brokers/topics/topic
for(partition <- partitionsForTopic) {
if(!deletePartition(topic, partition))
{
error(“Failed to delete partition for topic”)
exit
}
}
// delete topic path in ZK
delete /brokers/topics/topic
}
|
Add partition to existing topic
Code Block |
---|
addPartition(topic, partition, replicas)
{
// write the partitions reassigned path for this create topic command
/brokers/partitions_reassigned/topic/partition=replicas
// start replicas for this new partition
for(replica <- replicas)
sendStateChange(“start-replica”, replica.brokerId, -1)
// wait till state change request is consumed by all replicas
if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout))
{
error(“Failed to create topic partition partitionId for timeout ms”)
exit
}
// create partition paths in ZK
/brokers/topics/topic/partitionId/replicas=replicas
delete /brokers/partitions_reassigned/topic/partitionId
}
|
Remove partition for existing topic
Code Block |
---|
deletePartition(topic, partition)
{
// empty list for partition reassignment means delete partition
/brokers/partitions_reassigned/topic/partition=””
// wait till replica is closed by all replicas
if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout))
{
error(“Failed to delete topic after timeout ms”)
return false
}
// create partition paths in ZK
delete /brokers/topics/topic/partitionId
delete /brokers/partitions_reassigned/topic/partitionId
}
|