...
When a new broker is added, we will automatically move some partitions from existing brokers to the new one. Out goal is to minimize the amount of data movement while maintaining a balanced load on each broker. We use a standalone coordinator process to do the rebalance and the algorithm is given below.
...
Take brokers offline
We’d also like to support shrinking a cluster by taking existing set of brokers offline using an administrative command like the following.
Code Block |
---|
alter cluster remove brokers broker-1
|
This will start the reassignment process for partitions currently hosted on broker-1. Once that is complete, the broker-1 will be taken offline. This command will also delete the state change path for broker-1
Data replication
We’d like to allow a client to choose either asynchronous or synchronous replication. In the former case, a message to be published is acknowledged as soon as it reaches 1 replica. In the latter case, we will make our best effort to make sure that a message is only acknowledged after it reaches allow a client to choose either asynchronous or synchronous replication. In the former case, a message to be published is acknowledged as soon as it reaches 1 replica. In the latter case, we will make our best effort to make sure that a message is only acknowledged after it reaches multiple replicas. When a client tries to publish a message to a partition of a topic, we need to propagate the message to all replicas. We have to decide:
...
- How can atomicity be guaranteed in the 2nd type of leader failure ?
- How can we avoid the problem of multiple leaders for the same partition at the same time ?
- If the brokers are in multiple racksIf the brokers are in multiple racks, how to guarantee that at least one replica goes to a different rack?
Kafka Replication Detailed Design
The document describes key data structures and algorithms in Kafka replication.
Paths stored in Zookeeper
Notation: When an element in a path is denoted xyz, that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/topic would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral.
We store the following paths in Zookeeper:
- Stores the information of all live brokers.
(ephemeral; created by admin)Code Block /brokers/ids/[broker_id] --> host:port
- Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
(created by admin)Code Block /brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …}
- Stores the id of the replica that’s the current leader of this partition
(created by leader)Code Block /brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral)
- Stores the id of the set of replicas that are in-sync with the leader
Code Block /brokers/topics/[topic]/[partition_id]/ISR -->{broker_id, …}(created by leader)
- This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
Code Block /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin)
Key data structures
Every broker stores a list of partitions and replicas assigned to it. The current leader of a partition further maintains 3 sets: AR, ISR, CUR and RAR, which correspond to the set of replicas that are assigned to the partition, in-sync with the leader, catching up with the leader, and being reassigned to other brokers. Normally, ISR AR and AR = ISR + CUR. The leader of a partition maintains a commitQ and uses it to buffer all produce requests to be committed. For each replica assigned to a broker, the broker periodically stores its HW in a checkpoint file.
Code Block |
---|
Replica { // a replica of a partition
broker_id : int
partition : Partition
isLocal : Boolean // is this replica local to this broker
log : Log // local log associated with this replica
hw : long // offset of the last committed message
leo : long // log end offset
}
Partition { //a partition in a topic
topic : string
partition_id : int
leader : Replica // the leader replica of this partition
commitQ : Queue // produce requests pending commit at the leader
AR : Set[Replica] // replicas assigned to this partition
ISR : Set[Replica] // In-sync replica set, maintained at the leader
CUR : Set[Replica] // Catch-up replica set, maintained at the leader
RAR : Set[Replica] // Reassigned replica set, maintained at the leader
}
|
Key algorithms
Zookeeper listeners ONLY on the leader
- Replica-change listener:
- child change on /brokers/topics (new topic registered)
- child change on /brokers/topics/topic (new partition registered)
- value change on /brokers/topics/topic/partition_id/replicas (new replica assigned)
- Partition-reassigned listener:
- child change on /brokers/partitions_reassigned
- child change on /brokers/partitions_reassigned/topic
Zookeeper listeners on all brokers
- Leader-change listener: value change on /brokers/topics/topic/partition_id/leader
- State-change listener: child change on /brokers/topics/topic/partition_id/state/broker_id
Configuration parameters
- LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
- KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.
Broker startup
Each time a broker starts up, it calls brokerStartup() and the algorithms are described below
Code Block |
---|
brokerStartup()
{
register its broker_id in /brokers/ids/[broker_id] in ZK
get replica info from ZK and compute AR, a list of replicas assigned to this broker
for each r in AR
{
register its broker_id in /brokers/topics/[topic]/[partition_id]/state/[broker_id] in ZK
register stateChangeListener() on /brokers/topics/[topic]/[partition_id]/state/[broker_id]
}
}
|
State change listener
Each broker has a ZK path that it listens to for state change requests from the leader
Code Block |
---|
stateChangeListener(r) { // listens to state change requests issued by the leader and acts on those
read next state change request
if(becomeFollowerRequest) becomeFollower(r)
if(replicaUnassignedRequest) closeReplica(r)
if(deleteReplicaRequest) deleteReplica(r)
if(replicaAssignedRequest) replicaStateChange(r)
}
|
State change operations
Replica state change
This state change is requested by the leader for a new replica assignment
Code Block |
---|
replicaStateChange(r: Replica) {
if( r's log is not already started) {
do local recovery of r's log
r.hw = min(last checkpointed HW for r, r.leo)
register a leader-change listener on partition r.partition.partition_id
}
if( a leader does not exist for partition r.partition.partition_id in ZK)
leaderElection(r)
}
|
Close replica
This state change is requested by the leader when a topic is deleted
Code Block |
---|
closeReplica(r: Replica)
{
stop the fetcher associated with r, if one exists
close and delete r
}
|
Become follower
This state change is requested by the leader when the leader for a replica changes
Code Block |
---|
becomeFollower(r: Replica)
{
stop the current ReplicaFetcherThread, if any
truncate the log to r.hw
start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
start HW checkpoint thread for r
}
|
Become leader
This state change is done by the new leader
Code Block |
---|
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
stop HW checkpoint thread for r
r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
r.partition.ISR = the current set of replicas in sync with r
r.partition.CUR = AR - ISR
write r.partition.ISR & r.partition.CUR in ZK
r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
r.partition.leader = r // this enables reads/writes to this partition on this broker
start a commit thread on r.partition
start HW checkpoint thread for r
get replica info from ZK and compute CUR, a list of replicas assigned to this broker
for each r in CUR
send become follower request to r
|
Leader election
Code Block |
---|
leaderElection(r: Replica)
read the current ISR and AR for r.partition.partition_id from ZK
if( (r in AR) && (ISR is empty || r in ISR) ) {
get r.hw for all r in ISR
if(r.hw < max(hw) for r in ISR)
wait for PreferredReplicaTime if r is not the preferred replica
if(successfully write r as the current leader of r.partition in ZK)
becomeLeader(r, ISR, CUR)
} else {
// some other replica will become leader
}
|
Produce requests
Produce request handler on the leader
Code Block |
---|
produceRequestHandler(pr: ProduceRequest)
{
if( the request partition pr.partition doesn't have leader replica on this broker)
throw NotLeaderException
log = r.partition.leader.log
append pr.messages to log
pr.offset = log.LEO
add pr to pr.partition.commitQ
}
|
HW checkpoint thread for partition p
Code Block |
---|
while(true) {
r = p.leader
flush r.hw & r.log to disk
}
|
Committer thread for partition p
Code Block |
---|
while(true) {
pr = commitQ.dequeue
canCommit = false
while(!canCommit) {
canCommit = true
for each r in ISR
if(!offsetReached(r, pr.offset)) {
canCommit = false
break
}
if(!canCommit) {
p.CUR.add(r)
p.ISR.delete(r)
write p.ISR to ZK
}
}
for each c in CUR
if(c.leo >= pr.offset) {
p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK
}
checkReassignedReplicas(pr, p.RAR, p.ISR)
checkLoadBalancing()
r.hw = pr.offset // increment the HW to indicate that pr is committed
send ACK to the client that pr is committed
}
offsetReached(r: Replica, offset: Long) {
if(r.leo becomes equal or larger than offset within KeepInSyncTime) return true
return false
}
checkLoadBalancing() { // see if we need to switch the leader to the preferred replica
if(leader replica is not the preferred one & the preferred replica is in ISR) {
delete /brokers/topics/[topic]/[partition_id]/leader in ZK
stop this commit thread
stop the HW checkpoint thread
}
}
checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica])
{
// see if all reassigned replicas have fully caught up, if so, switch to those replicas
// optimization, do the check periodically
If (every replica in RAR has its leo >= pr.offset) {
//newly assigned replicas are in-sync, switch over to the new replicas
write (RAR + ISR) as the new ISR in ZK //need (RAR + ISR) in case we fail right after here
update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR
delete /brokers/topics/[topic]/[partition_id]/leader in ZK //triggers leader election
delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
stop this commit thread
}
}
|
Follower fetching from leader
A follower keeps sending ReplicaFetcherRequests to the leader. The process at the leader and the follower are described below -
Code Block |
---|
ReplicaFetchReqeust {
topic: String
partition_id: Int
replica_id: Int
offset: Long
}
ReplicaFetchResponse {
hw: Long // the offset of the last message committed at the leader
messages: MessageSet // fetched messages
}
|
At the leader
Code Block |
---|
replicaFetch (f: ReplicaFetchRequest) { // handler for ReplicaFetchRequest at leader
leader = getLeaderReplica(f.topic, f.partition_id)
if(leader == null) throw NotLeaderException
response = new ReplicaFetcherResponse
getReplica(f.topic, f.partition_id, f.replica_id).leo = f.offset
response.messages = fetch messages starting from f.offset from leader.log
response.hw = leader.hw
send response back
}
|
At the follower
Code Block |
---|
ReplicaFetcherThread for Replica r:
while(true) {
send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back
append response.messages to r's log
r.hw = response.hw
advance offset in ReplicaFetchRequest
}
|
...
following are different proposals of the detailed implementation. In 0.8, the implementation is based on the v3 proposal.