Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Stores the information of all live brokers.
    Code Block
    /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin) 
  2. Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
    Code Block
    /brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …}  (created by admin) 
  3. Stores the id of the replica that’s the current leader of this partition
    Code Block
     /brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral) (created by leader) 
  4. Stores the id of the set of replicas that are in-sync with the leader
    Code Block
     /brokers/topics/[topic]/[partition_id]/ISR --> {broker_id, …} (created by leader) 
  5. This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
    Code Block
     /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin) 

  6. This path is used to increment and get a new epoch value. This path is used by each new leader to increment the epoch.
    Code Block
     /brokers/epoch (created by admin) 

  7. This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica. This path is created by the add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a broker is temporarily unavailable (for example, being bounced).
    Code Block
     /brokers/state/[broker_id] --> { state change requests ... } (created by admin) 

Key data structures

Every broker stores a list of partitions and replicas assigned to it. The current leader of a partition further maintains 3 sets: AR, ISR, CUR and RAR, which correspond to the set of replicas that are assigned to the partition, in-sync with the leader, catching up with the leader, and being reassigned to other brokers. Normally, ISR is a subset of AR and AR = ISR + CUR. The leader of a partition maintains a commitQ and uses it to buffer all produce requests to be committed. For each replica assigned to a broker, the broker periodically stores its HW in a checkpoint file.

Code Block

Replica {                    // a replica of a partition
  broker_id   : int
  partition   : Partition
  isLocal     : Boolean      // is this replica local to this broker
  log         : Log          // local log associated with this replica
  hw          : long         // offset of the last committed message
  leo         : long         // log end offset
}

Partition {                         //a partition in a topic
  topic            : string
  partition_id     : int
  leader           : Replica        // the leader replica of this partition
  commitQ          : Queue          // produce requests pending commit at the leader
  AR               : Set[Replica]   // replicas assigned to this partition
  ISR              : Set[Replica]   // In-sync replica set, maintained at the leader
  CUR              : Set[Replica]   // Catch-up replica set, maintained at the leader
  RAR              : Set[Replica]   // Reassigned replica set, maintained at the leader
}

Key algorithms

Zookeeper listeners ONLY on the leader

  1. Partition-reassigned listener:
    1. child change on /brokers/partitions_reassigned
    2. child change on /brokers/partitions_reassigned/topic
  1. Zookeeper listeners on all brokers
    1. Leader-change listener: value change on /brokers/topics/topic/partition_id/leader
    2. State-change listener: child change on /brokers/state/broker_id
  1. Configuration parameters
    1. LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
    2. KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.

Broker startup

Each time a broker starts up, it calls brokerStartup() and the algorithms are described below

Code Block

brokerStartup()
{
  create /brokers/state/[broker_id] path if it doesn’t already exist
  register the state change handler to listen on child change ZK notifications on /brokers/state/[broker_id]
  register session expiration listener
  drain the state change queue 
  get replica info from ZK and compute AR, a list of replicas assigned to this broker
  for each r in AR
  {
      subscribe to leader changes for the r’s partition
      startReplica(r)
  }
  // broker startup procedure is complete. Register is broker id in ZK to announce the availability of this broker
  register its broker_id in /brokers/ids/[broker_id] in ZK
}