...
- Stores the information of all live brokers.
Code Block /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin)
- Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
Code Block /brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …} (created by admin)
- Stores the id of the replica that’s the current leader of this partition
Code Block /brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral) (created by leader)
- Stores the id of the set of replicas that are in-sync with the leader
Code Block /brokers/topics/[topic]/[partition_id]/ISR --> {broker_id, …} (created by leader)
- This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
Code Block /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin)
- This path is used to increment and get a new epoch value. This path is used by each new leader to increment the epoch.
Code Block /brokers/epoch (created by admin)
- This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica. This path is created by the add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a broker is temporarily unavailable (for example, being bounced).
Code Block /brokers/state/[broker_id] --> { state change requests ... } (created by admin)
Key data structures
Every broker stores a list of partitions and replicas assigned to it. The current leader of a partition further maintains 3 sets: AR, ISR, CUR and RAR, which correspond to the set of replicas that are assigned to the partition, in-sync with the leader, catching up with the leader, and being reassigned to other brokers. Normally, ISR is a subset of AR and AR = ISR + CUR. The leader of a partition maintains a commitQ and uses it to buffer all produce requests to be committed. For each replica assigned to a broker, the broker periodically stores its HW in a checkpoint file.
Code Block |
---|
Replica { // a replica of a partition
broker_id : int
partition : Partition
isLocal : Boolean // is this replica local to this broker
log : Log // local log associated with this replica
hw : long // offset of the last committed message
leo : long // log end offset
}
Partition { //a partition in a topic
topic : string
partition_id : int
leader : Replica // the leader replica of this partition
commitQ : Queue // produce requests pending commit at the leader
AR : Set[Replica] // replicas assigned to this partition
ISR : Set[Replica] // In-sync replica set, maintained at the leader
CUR : Set[Replica] // Catch-up replica set, maintained at the leader
RAR : Set[Replica] // Reassigned replica set, maintained at the leader
}
|
Key algorithms
Zookeeper listeners ONLY on the leader
- Partition-reassigned listener:
- child change on /brokers/partitions_reassigned
- child change on /brokers/partitions_reassigned/topic
- Zookeeper listeners on all brokers
- Leader-change listener: value change on /brokers/topics/topic/partition_id/leader
- State-change listener: child change on /brokers/state/broker_id
- Configuration parameters
- LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
- KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.
Broker startup
Each time a broker starts up, it calls brokerStartup() and the algorithms are described below
Code Block |
---|
brokerStartup()
{
create /brokers/state/[broker_id] path if it doesn’t already exist
register the state change handler to listen on child change ZK notifications on /brokers/state/[broker_id]
register session expiration listener
drain the state change queue
get replica info from ZK and compute AR, a list of replicas assigned to this broker
for each r in AR
{
subscribe to leader changes for the r’s partition
startReplica(r)
}
// broker startup procedure is complete. Register is broker id in ZK to announce the availability of this broker
register its broker_id in /brokers/ids/[broker_id] in ZK
}
|