This detailed design differs from the original detailed design in the following areas -
- The state machine in this design is completely controlled only by the leader for each partition. Each follower changes its state only based on such a request from the leader for a particular partition. Leader co-ordinated state machine allows central state machine verification and allows it to fail fast.
- This design introduces a global epoch, which is a non-decreasing value for a Kafka cluster. The epoch changes when the leader for a partition changes.
- This design handles delete partition or delete topic state changes for dead brokers by queuing up state change requests for a broker in Zookeeper.
- This design scales better wrt to number of ZK watches, since it registers fewer watches compared to V1. The motivation is to be able to reduce the load on ZK when the Kafka cluster grows to thousands of partitions. For example, if we have a cluster of 3 brokers hosting 1000 topics with 3 partitions each, the V1 design requires registering 15000 watches. The V2 design requires registering 3000 watches.
- This design ensures that leader change ZK notifications are not queued up on any other notifications and can happen instantaneously.
Paths stored in Zookeeper
Notation: When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral.
We store the following paths in Zookeeper:
- Stores the information of all live brokers.
/brokers/ids/[broker_id] --> host:port (ephemeral; created by admin)
- Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
/brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …} (created by admin)
- Stores the id of the replica that’s the current leader of this partition
/brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral) (created by leader)
- Stores the id of the set of replicas that are in-sync with the leader
/brokers/topics/[topic]/[partition_id]/ISR --> {broker_id, …} (created by leader)
- This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
/brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin)
- This path is used to increment and get a new epoch value. This path is used by each new leader to increment the epoch.
/brokers/epoch (created by admin)
- This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica. This path is created by the add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a broker is temporarily unavailable (for example, being bounced).
/brokers/state/[broker_id] --> { state change requests ... } (created by admin)
Key data structures
Every broker stores a list of partitions and replicas assigned to it. The current leader of a partition further maintains 3 sets: AR, ISR, CUR and RAR, which correspond to the set of replicas that are assigned to the partition, in-sync with the leader, catching up with the leader, and being reassigned to other brokers. Normally, ISR is a subset of AR and AR = ISR + CUR. The leader of a partition maintains a commitQ and uses it to buffer all produce requests to be committed. For each replica assigned to a broker, the broker periodically stores its HW in a checkpoint file.
Replica { // a replica of a partition broker_id : int partition : Partition isLocal : Boolean // is this replica local to this broker log : Log // local log associated with this replica hw : long // offset of the last committed message leo : long // log end offset } Partition { //a partition in a topic topic : string partition_id : int leader : Replica // the leader replica of this partition commitQ : Queue // produce requests pending commit at the leader AR : Set[Replica] // replicas assigned to this partition ISR : Set[Replica] // In-sync replica set, maintained at the leader CUR : Set[Replica] // Catch-up replica set, maintained at the leader RAR : Set[Replica] // Reassigned replica set, maintained at the leader }
Key algorithms
Zookeeper listeners ONLY on the leader
- Partition-reassigned listener:
- child change on /brokers/partitions_reassigned
- child change on /brokers/partitions_reassigned/topic
- Zookeeper listeners on all brokers
- Leader-change listener: value change on /brokers/topics/topic/partition_id/leader
- State-change listener: child change on /brokers/state/broker_id
- Configuration parameters
- LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
- KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.
Broker startup
Each time a broker starts up, it calls brokerStartup() and the algorithms are described below
brokerStartup() { create /brokers/state/[broker_id] path if it doesn’t already exist register the state change handler to listen on child change ZK notifications on /brokers/state/[broker_id] register session expiration listener drain the state change queue get replica info from ZK and compute AR, a list of replicas assigned to this broker for each r in AR { subscribe to leader changes for the r’s partition startReplica(r) } // broker startup procedure is complete. Register is broker id in ZK to announce the availability of this broker register its broker_id in /brokers/ids/[broker_id] in ZK }
State change events
On every broker
Leader change
This leader change listener is registered on every broker hosting a partition p. Each time it is triggered, the following procedure is executed -
onLeaderChange() { if(broker_id is registered under /brokers/topics/[topic]/[partition_id]/replicas) leaderElection(r) }
On State change
Each broker has a ZK path that it listens to for state change requests from the leader
stateChangeListener() { // listens to state change requests issued by the leader and acts on those drain the state change queue read next state change request Let r be the replica that the state change request is sent for. // this should not happen Throw an error if r is not hosted on this broker Let requestEpoch be the epoch of the state change request if(closeReplicaRequest) { closeReplica(r) // we don’t need to check epoch here to be able to handle delete topic/delete partition for dead brokers. } if(startReplicaRequest) { Let latestPartitionEpoch be the latest epoch for this partition, got by reading /brokers/topics/[topic]/[partition]/leader=[broker_id],[epoch] if(leader for r.partition doesn’t exist) { // this can only happen for new topics or new partitions for existing topics startReplica(r) }else if(requestEpoch == latestPartitionEpoch) { // this is to ensure that if a follower is slow, and reads a state change request queued up by a previous leader, it ignores the request startReplica(r) } } }
On the leader
On reassignment of partitions
Each time a partition reassigned event is triggered on the leader, it calls onPartitionReassigned()
onPartitionsReassigned() { if(this broker is the leader for [partition_id]) { p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id] AR = /brokers/topics/[topic]/[partition_id]/replicas newReplicas = p.RAR - AR for(newReplica <- newReplicas) sendStateChange(“start-replica”, newReplica.broker_id, epoch) } }
State change communication
The leader uses this API to communicate a state change request to the followers
sendStateChange(stateChange, followerBrokerId, leaderEpoch) { stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”) stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch) // check if the state change Q is full. This can happen if a broker is offline for a long time if(stateChangeQ.isFull) { // this operation retains only one close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition stateChangeQ.shrink // if the queue is still full, log an error throw new FollowerStateChangeQueueFull } stateChangeQ.put(stateChangeRequest) }
State change operations
Start replica
This state change is requested by the leader or the admin command for a new replica assignment
startReplica(r: Replica) { if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas) throw NotReplicaForPartitionException() if( r's log is not already started) { do local recovery of r's log r.hw = min(last checkpointed HW for r, r.leo) register a leader-change listener on partition r.partition.partition_id } if( a leader does not exist for partition r.partition.partition_id in ZK) leaderElection(r) else { if(this broker is not the leader, then it is a follower since it is in the AR list for this partition) becomeFollower(r) } }
Close replica
This state change is requested by the leader when a topic or partition is deleted or moved to another broker
closeReplica(r: Replica) { stop the fetcher associated with r, if one exists close and delete r }
Become follower
This state change is requested by the leader when the leader for a replica changes
becomeFollower(r: Replica) { // this is required if this replica was the last leader stop the commit thread, if any stop the current ReplicaFetcherThread, if any truncate the log to r.hw start a new ReplicaFetcherThread to the current leader of r, from offset r.leo start HW checkpoint thread for r }
Become leader
This state change is done by the new leader
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica]) { // get a new epoch value and write it to the leader path epoch = getNewEpoch() /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch stop HW checkpoint thread for r r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed r.partition.ISR = the current set of replicas in sync with r r.partition.CUR = AR - ISR write r.partition.ISR & r.partition.CUR in ZK r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK r.partition.leader = r // this enables reads/writes to this partition on this broker start a commit thread on r.partition start HW checkpoint thread for r get replica info from ZK and compute AR, a list of replicas assigned to this broker for each r in AR { if(r.broker_id is registered under /brokers/ids) sendStateChange(“start-replica”, r.broker_id, epoch) } }