Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. This design aims to remove split-brain and herd effect issues in the V1 design. A partition has only one brain (on the leader) and all brokers only respond to state changes that are meant for them (as decided by the leader).
  2. The state machine in this design is completely controlled only by the leader for each partition. Each follower changes its state only based on such a request from the leader for a particular partition. Leader co-ordinated state machine allows central state machine verification and allows it to fail fast.
  3. This design introduces a global epochan epoch or generation id per partition, which is a non-decreasing value for a Kafka clusterpartition. The epoch changes increments when the leader for a partition changes.
  4. This design handles delete partition or delete topic state changes for dead brokers by queuing up state change requests for a broker in Zookeeper.
  5. This design scales better wrt to number of ZK watches, since it registers fewer watches compared to V1. The motivation is to be able to reduce the load on ZK when the Kafka cluster grows to thousands of partitions. For example, if we have a cluster of 3 brokers hosting 1000 topics with 3 partitions each, the V1 design requires registering 15000 watches. The V2 design requires registering 3000 watches.
  6. This design ensures that leader change ZK notifications are not queued up on any other notifications and can happen instantaneously.
  7. This design allows explicit monitoring of
    1. the entire lifecycle of a state change -
      1. leader, broker id 0, requested start-replica for topic foo partition 0, to broker id 1, at epoch 10
      2. leader, broker id 0, requested start-replica for topic foo partition 0, to broker id 2, at epoch 10
      3. follower, broker id 1, received start-replica for topic foo partition 0, from leader 0, at epoch 10
      4. follower, broker id 2, received start-replica for topic foo partition 0, from leader 0, at epoch 10
      5. follower, broker id 1, completed start-replica for topic foo partition 0, request from leader 0, at epoch 10
      6. follower, broker id 2, completed start-replica for topic foo partition 0, request from leader 0, at epoch 10
    2. the backup of state change requests, on slow followers

...

  1. Stores the information of all live brokers.
    Code Block
    /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin) 
  2. Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
    Code Block
    /brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …}  (created by admin) 
  3. Stores the id of the replica that’s the current leader of this partition
    Code Block
     /brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral) (created by leader) 
  4. Stores the id of the set of replicas that are in-sync with the leader
    Code Block
     /brokers/topics/[topic]/[partition_id]/ISR --> {broker_id, …} (created by leader) 
  5. This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
    Code Block
     /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin) 

  6. This path is used to increment and get a new epoch value. This path is used by each new leader to increment the epoch.
    Code Block
     /brokers/epoch (created by admin) 

    This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica. This path is created by the add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a broker is temporarily unavailable (for example, being bounced).
    Code Block
     /brokers/state/[broker_id] --> { state change requests ... } (created by admin) 

...

Code Block
stateChangeListener() {  
// listens to state change requests issued by the leader and acts on those

    drain the state change queue
    read next state change request
    Let r be the replica that the state change request is sent for.
    // this should not happen
    Throw an error if r is not hosted on this broker                
    Let requestEpoch be the epoch of the state change request
    if(closeReplicaRequest) 
    { 
	 closeReplica(r)
      // we don’t need to check epoch here to be able to handle delete topic/delete partition for dead brokers.
       closeReplica(r)
    }
    if(startReplicaRequest) 
    { 
       Let latestPartitionEpoch be the latest epoch for this partition, got by reading /brokers/topics/[topic]/[partition]/leader=[broker_id],[epoch] ISR
       if(leader for r.partition doesn’t exist) {
	   // this can only happen for new topics or new partitions for existing topics
         startReplica(r)         
       }else if(requestEpoch == latestPartitionEpoch) {
         // this is to ensure that if a follower is slow, and reads a state change request queued up by a previous leader, it ignores the request
	    startReplica(r)
       }
    }
}

...

Code Block
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
   // get a new epoch value and write it to the leader path
   epoch = getNewEpoch()
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/ISR=ISR;epoch
   stop HW checkpoint thread for r
   r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
   wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
   r.partition.ISR = the current set of replicas in sync with r
   r.partition.CUR = AR - ISR
   write r.partition.ISR & r.partition.CUR in ZK
   r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
    get replica info from ZK and compute AR, a list of replicas assigned to this broker
}

...