Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

Kafka Replication Detailed Design

The document describes key data structures and algorithms in Kafka replication.

Paths stored in Zookeeper

Notation: When an element in a path is denoted xyz, that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/topic would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral.

We store the following paths in Zookeeper:

  1. Stores the information of all live brokers.
    Code Block
     /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin) 
  2. Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
    Code Block
    /brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …}  (created by admin) 
  3. Stores the id of the replica that’s the current leader of this partition
    Code Block
     /brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral) (created by leader) 
  4. Stores the id of the set of replicas that are in-sync with the leader
    Code Block
     /brokers/topics/[topic]/[partition_id]/ISR -->{broker_id, …}(created by leader) 
  5. This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
    Code Block
     /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin) 
  6. This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica, become follower. This path is created by the add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a replica is temporarily unavailable (for example, being bounced).
    Code Block
      /brokers/state/[broker_id] --> { state change requests ... } (created by admin) 

Key data structures

Every broker stores a list of partitions and replicas assigned to it. The current leader of a partition further maintains 3 sets: AR, ISR, CUR and RAR, which correspond to the set of replicas that are assigned to the partition, in-sync with the leader, catching up with the leader, and being reassigned to other brokers. Normally, ISR  AR and AR = ISR + CUR. The leader of a partition maintains a commitQ and uses it to buffer all produce requests to be committed. For each replica assigned to a broker, the broker periodically stores its HW in a checkpoint file.

Code Block

Replica { // a replica of a partition
  broker_id : int
  partition   : Partition
  isLocal     : Boolean  // is this replica local to this broker
  log           : Log          // local log associated with this replica
  hw           : long         // offset of the last committed message
  leo           : long         // log end offset
}

Partition { //a partition in a topic
  topic           : string
  partition_id : int
  leader         : Replica        // the leader replica of this partition
  commitQ     : Queue         // produce requests pending commit at the leader
  AR              : Set[Replica] // replicas assigned to this partition
  ISR             : Set[Replica] // In-sync replica set, maintained at the leader
  CUR           : Set[Replica] // Catch-up replica set, maintained at the leader
  RAR           : Set[Replica] // Reassigned replica set, maintained at the leader
}

Key algorithms

Zookeeper listeners ONLY on the leader

  1. Replica-change listener:
    1. child change on /brokers/topics (new topic registered)
    2. Wiki Markup
      child change on /brokers/topics/\[topic\] (new partition registered)
    3. Wiki Markup
      value change on /brokers/topics/\[topic\]/\[partition_id\]/replicas (new replica assigned)
  2. Partition-reassigned listener:
    1. child change on /brokers/partitions_reassigned
    2. Wiki Markup
      child change on /brokers/partitions_reassigned/\[topic\]

Zookeeper listeners on all brokers

  1. Wiki Markup
    Leader-change listener: value change on /brokers/topics/\[topic\]/\[partition_id\]/leader
  2. Wiki Markup
    State-change listener: child change on /brokers/state/\[broker_id\]

Configuration parameters

  1. LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
  2. KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.

Broker startup

Each time a broker starts up, it calls brokerStartup() and the algorithms are described below

Code Block

brokerStartup()
{
  register its broker_id in /brokers/ids/[broker_id] in ZK
  get replica info from ZK and compute AR, a list of replicas assigned to this broker
  for each r in AR
  {
      register stateChangeListener() on /brokers/state/[broker_id]
      startReplica()
  }
}

State change listener

Each broker has a ZK path that it listens to for state change requests from the leader

Code Block

stateChangeListener() {  // listens to state change requests issued by the leader and acts on those
    read next state change request
    Let r be the replica that the state change request is sent for.
    Throw an error if r is not hosted on this broker                // this should not happen 
    if(becomeFollowerRequest)   becomeFollower(r)
    if(closeReplicaRequest)  closeReplica(r)
    if(startReplicaRequest)  startReplica(r)
}

State change events

On leader change

This state change listener is registered on every broker hosting partition p. Each time it is triggered, the following procedure is executed -

Code Block

onLeaderChange() 
{
   if(r is registered under /brokers/topics/[topic]/[partition_id]/replicas)
      leaderElection()
   else
    // some other replica is already the leader, and will send the become follower state change request to this replica
}

On replica change

Every time a replica change event is triggered, the leader calls onReplicaChange()

Code Block

onReplicaChange() 
{
  for new replica assigned to follower R
    send start replica state change request to R
  for replica r un-assigned from follower R
    send close replica state change request to R
}

On reassignment of partitions

Each time a partition reassigned event is triggered, the leader calls onPartitionReassigned()

Code Block

onPartitionsReassigned() 
{
  p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
  if(a new replica is to be reassigned to follower R)
     send start-replica state change request to R
}

State change operations

Start replica state change

This state change is requested by the leader for a new replica assignment

Code Block

startReplica(r: Replica) {
  if( r's log is not already started) {
     do local recovery of r's log
     r.hw = min(last checkpointed HW for r, r.leo)
      register a leader-change listener on partition r.partition.partition_id
  }
   if( a leader does not exist for partition r.partition.partition_id in ZK)
      leaderElection(r)
}

Close replica

This state change is requested by the leader when a topic is deleted

Code Block

closeReplica(r: Replica)
{
   stop the fetcher associated with r, if one exists
   close and delete r
}

Become follower

This state change is requested by the leader when the leader for a replica changes

Code Block

becomeFollower(r: Replica)
{
  stop the commit thread, if any                  // this is required if this replica was the last leader
  stop the current ReplicaFetcherThread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
  start HW checkpoint thread for r
}

Become leader

This state change is done by the new leader

Code Block

becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
   stop HW checkpoint thread for r
   r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
   wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
   r.partition.ISR = the current set of replicas in sync with r
   r.partition.CUR = AR - ISR
   write r.partition.ISR & r.partition.CUR in ZK
   r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
    get replica info from ZK and compute CUR, a list of replicas assigned to this broker
    for each r in CUR
       send become follower request to r

Leader election

Code Block

leaderElection(r: Replica)
  read the current ISR and AR for r.partition.partition_id from ZK
  if( (r in AR) && (ISR is empty || r in ISR) ) {
  get r.hw for all r in ISR
  if(r.hw < max(hw) for r in ISR)
   wait for PreferredReplicaTime if r is not the preferred replica
   if(successfully write r as the current leader of r.partition in ZK)
     becomeLeader(r, ISR, CUR)
} else {
   // some other replica will become leader
}

Produce requests

Produce request handler on the leader

Code Block

produceRequestHandler(pr: ProduceRequest)
{
   if( the request partition pr.partition doesn't have leader replica on this broker)
      throw NotLeaderException
   log = r.partition.leader.log
   append pr.messages to log
   pr.offset = log.LEO
   add pr to pr.partition.commitQ
}

HW checkpoint thread for partition p

Code Block

   r = p.leader
   flush r.hw & r.log to disk

Committer thread for partition p

Code Block

while(true) {
  pr = commitQ.dequeue
  canCommit = false
  while(!canCommit) {
    canCommit = true
    for each r in ISR
       if(!offsetReached(r, pr.offset)) {
	canCommit = false
	break
       }
   if(!canCommit) {
      p.CUR.add(r)
      p.ISR.delete(r)
      write p.ISR to ZK
   }
  }
  for each c in CUR
      if(c.leo >= pr.offset) {
	p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK
      }
      checkReassignedReplicas(pr, p.RAR, p.ISR)
      checkLoadBalancing()
       r.hw = pr.offset         // increment the HW to indicate that pr is committed
      send ACK to the client that pr is committed
}

offsetReached(r: Replica, offset: Long) {
   if(r.leo becomes equal or larger than offset within KeepInSyncTime)  return true
   return false
}

checkLoadBalancing() { // see if we need to switch the leader to the preferred replica
   if(leader replica is not the preferred one & the preferred replica is in ISR) {
       delete /brokers/topics/[topic]/[partition_id]/leader  in ZK
       stop this commit thread
       stop the HW checkpoint thread
   }
 }

checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica])

{
    // see if all reassigned replicas have fully caught up, if so, switch to those replicas

    // optimization, do the check periodically

    If (every replica in RAR has its leo >= pr.offset) {

    //newly assigned replicas are in-sync, switch over to the new replicas

       write (RAR + ISR) as the new ISR in ZK //need (RAR + ISR) in case we fail right after here

       update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR

       delete /brokers/topics/[topic]/[partition_id]/leader in ZK //triggers leader election

       delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK

       stop this commit thread

    }
}

Follower fetching from leader

A follower keeps sending ReplicaFetcherRequests to the leader. The process at the leader and the follower are described below -

Code Block

ReplicaFetchReqeust {
   topic: String
   partition_id: Int
   replica_id: Int
   offset: Long
}
ReplicaFetchResponse {
   hw: Long			// the offset of the last message committed at the leader
   messages: MessageSet	// fetched messages
}

At the leader

Code Block

replicaFetch (f: ReplicaFetchRequest) {	// handler for ReplicaFetchRequest at leader
  leader = getLeaderReplica(f.topic, f.partition_id)

  if(leader == null) throw NotLeaderException
   response = new ReplicaFetcherResponse
   getReplica(f.topic, f.partition_id, f.replica_id).leo = f.offset
    response.messages = fetch messages starting from f.offset from leader.log
    response.hw = leader.hw
    send response back
}

At the follower

Code Block

ReplicaFetcherThread for Replica r:
while(true) {
   send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back
   append response.messages to r's log
   r.hw = response.hw
   advance offset in ReplicaFetchRequest
}

Leader change event

Each broker registers a leader-change listener in ZK for every replica assigned to it. Everytime a leader-change event is triggered, the broker calls onLeaderChange()

...

following are different proposals of the detailed implementation. In 0.8, the implementation is based on the v3 proposal.

V2 proposal

V3 proposal