Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This detailed design differs from the original detailed design in the following areas -

  1. The state machine in this design is completely controlled only by the leader for each partition. Each follower changes its state only based on such a request from the leader for a particular partition. Leader co-ordinated state machine allows central state machine verification and allows it to fail fast.
  2. This design introduces a global epoch, which is a non-decreasing value for a Kafka cluster. The epoch changes when the leader for a partition changes.
  3. This design handles delete partition or delete topic state changes for dead brokers by queuing up state change requests for a broker in Zookeeper.
  4. This design scales better wrt to number of ZK watches, since it registers fewer watches compared to V1. The motivation is to be able to reduce the load on ZK when the Kafka cluster grows to thousands of partitions. For example, if we have a cluster of 3 brokers hosting 1000 topics with 3 partitions each, the V1 design requires registering 15000 watches. The V2 design requires registering 3000 watches.
  5. This design ensures that leader change ZK notifications are not queued up on any other notifications and can happen instantaneously.

Paths stored in Zookeeper

Wiki Markup
Notation: When an element in a path is denoted \[xyz\], that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/\[topic\] would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral.

We store the following paths in Zookeeper:

  1. Stores the information of all live brokers.
    Code Block
    /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin) 
  2. Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
    Code Block
    /brokers/topics/[topic]/[partition_id]/replicas --> {broker_id …}  (created by admin) 
  3. Stores the id of the replica that’s the current leader of this partition
    Code Block
     /brokers/topics/[topic]/[partition_id]/leader --> broker_id (ephemeral) (created by leader) 
  4. Stores the id of the set of replicas that are in-sync with the leader
    Code Block
     /brokers/topics/[topic]/[partition_id]/ISR --> {broker_id, …} (created by leader) 
  5. This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
    Code Block
     /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin) 

  6. This path is used to increment and get a new epoch value. This path is used by each new leader to increment the epoch.
    Code Block
     /brokers/epoch (created by admin) 

  7. This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica. This path is created by the add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a broker is temporarily unavailable (for example, being bounced).
    Code Block
     /brokers/state/[broker_id] --> { state change requests ... } (created by admin)