Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Stores the information of all live brokers.
    Code Block
     /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin) 
  2. Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
    Code Block
    /brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …}  (created by admin) 
  3. Stores the id of the replica that’s the current leader of this partition
    Code Block
     /brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral) (created by leader) 
  4. Stores the id of the set of replicas that are in-sync with the leader
    Code Block
     /brokers/topics/[topic]/[partition_id]/ISR -->{broker_id, …}(created by leader) 
  5. This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
    Code Block
     /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin) 
  6. This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica, become follower. This path is created during by the create topic add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a replica is temporarily unavailable (for example, being bounced).
    Code Block
      /brokers/topics/[topic]/[partition_id]/state/[broker_id] --> { state change requests ... } (created by admin) 

...

  1. Replica-change listener:
    1. child change on /brokers/topics (new topic registered)
    2. Wiki Markup
      child change on /brokers/topics/\[topic\] (new partition registered)
    3. Wiki Markup
      value change on /brokers/topics/\[topic\]/\[partition_id\]/replicas (new replica assigned)
  2. Partition-reassigned listener:
    1. child change on /brokers/partitions_reassigned
    2. Wiki Markup
      child change on /brokers/partitions_reassigned/\[topic\]

Zookeeper listeners on all brokers

  1. Wiki Markup
    Leader-change listener: value change on /brokers/topics/\[topic\]/\[partition_id\]/leader
  2. Wiki Markup
    State-change listener: child change on /brokers
    /topics/topic/partition_id
    /state/\[broker_id\]

Configuration parameters

  1. LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
  2. KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.

...

Code Block
brokerStartup()
{
  register its broker_id in /brokers/ids/[broker_id] in ZK
  get replica info from ZK and compute AR, a list of replicas assigned to this broker
  for each r in AR
  {
      register its broker_id in stateChangeListener() on /brokers/topics/[topic]/[partition_id]/state/[broker_id] in ZK
      register stateChangeListener() on /brokers/topics/[topic]/[partition_id]/state/[broker_id]startReplica()
  }
}

State change listener

Each broker has a ZK path that it listens to for state change requests from the leader

Code Block
stateChangeListener(r) {  // listens to state change requests issued by the leader and acts on those
    read next state change request
    if(becomeFollowerRequest)Let r be becomeFollower(r)
the replica that the if(replicaUnassignedRequest)state change closeReplica(r)
request is sent  if(deleteReplicaRequest)for.
   deleteReplica(r)
 Throw an error if(replicaAssignedRequest)  replicaStateChange(r)
}

State change events

On leader change

This state change listener is registered on every broker hosting partition p. Each time it is triggered, the following procedure is executed -

Code Block

onLeaderChange() 
{
   if(r is registered under /brokers/topics/[topic]/[partition_id]/replicas)
      leaderElection()
   else
    // some other replica is already the leader, and will send the become follower state change request to this replica
}

On replica change

Every time a replica change event is triggered, the leader calls onReplicaChange()

r is not hosted on this broker                // this should not happen 
    if(becomeFollowerRequest)   becomeFollower(r)
    if(closeReplicaRequest)  closeReplica(r)
    if(startReplicaRequest)  startReplica(r)
}

State change events

On leader change

This state change listener is registered on every broker hosting partition p. Each time it is triggered, the following procedure is executed -

Code Block

onLeaderChange
Code Block

onReplicaChange() 
{
  for newif(r replicais assignedregistered to follower Runder /brokers/topics/[topic]/[partition_id]/replicas)
    send start replicaleaderElection()
 state change requestelse
 to R
  for// replicasome rother un-assignedreplica fromis followeralready R
the leader, and will send the closebecome replicafollower state change request to Rthis replica
}

On

...

replica change

Every Each time a partition reassigned replica change event is triggered, the leader calls onPartitionReassignedonReplicaChange()

Code Block
onPartitionsReassignedonReplicaChange() 
{
  p.RARfor =new thereplica newassigned replicasto from /brokers/partitions_reassigned/[topic]/[partition_id]follower R
  if(a new  send start replica isstate change request to be reassigned toR
  for replica r un-assigned from follower R)
    send sendclose start-replica state change request to R
}

State change operations

Replica state change

On reassignment of partitions

Each time a partition reassigned event is triggered, the leader calls onPartitionReassigned() This state change is requested by the leader for a new replica assignment

Code Block
replicaStateChangeonPartitionsReassigned(r: Replica) 
{
  if( r's log is not already started) {
     do local recovery of r's logp.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
  if(a new replica is to be reassigned to follower R)
     send start-replica state change request to R
}

State change operations

Replica state change

This state change is requested by the leader for a new replica assignment

Code Block

replicaStateChange(r: Replica) {
  if( r's log is not already started) {
     do local recovery of r's log
     r.hw = min(last checkpointed HW for r, r.leo)
      register a leader-change listener on partition r.partition.partitionr.hw = min(last checkpointed HW for r, r.leo)
      register a leader-change listener on partition r.partition.partition_id
  }
   if( a leader does not exist for partition r.partition.partition_id in ZK)
      leaderElection(r)
}

...

Code Block
becomeFollower(r: Replica)
{
  stop the currentcommit ReplicaFetcherThreadthread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current       // this is required if this replica was the last leader
  stop the current ReplicaFetcherThread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
  start HW checkpoint thread for r
}

...

HW checkpoint thread for partition p

Code Block

while(true) {
       r = p.leader
       flush rflush r.hw & r.log to disk
}

Committer thread for partition p

...

Code Block
replicaFetch (f: ReplicaFetchRequest) {	// handler for ReplicaFetchRequest at leader
  leader = getLeaderReplica(f.topic, f.partition_id)

  if(leader == null) throw NotLeaderException
   response = new ReplicaFetcherResponse
   getReplica(f.topic, f.partition_id, f.replica_id).leo = f.offset
    response.messages = fetch messages starting from f.offset from leader.log
    response.hw = leader.hw
    send response back
}

At the follower

Code Block

ReplicaFetcherThread for Replica r:
while(true) {
   send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back
   append response.messages to r's log
   r.hw = response.hw
   advance offset in ReplicaFetchRequest
}

...

Code Block

ReplicaFetcherThread for Replica r:
while(true) {
   send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back
   append response.messages to r's log
   r.hw = response.hw
   advance offset in ReplicaFetchRequest
}

Leader change event

Each broker registers a leader-change listener in ZK for every replica assigned to it. Everytime a leader-change event is triggered, the broker calls onLeaderChange()

Code Block

onLeaderChange() 
{
   if(no leader exists under /brokers/topics/[topic]/[partition_id]/leader)
   {
      if(r is still registered under /brokers/topics/[topic]/[partition_id]/replicas)
         leaderElection(r)
   }
}