Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
coordinatorElector : ZkElection                     // A ZK based elector using the coordinator path mentioned above

groupsBeingRebalanced : Map[String, AtomicBoolean]  // For each group, a bit indicating if the group is under rebalancing

consumerGroupsPerTopic : Map[String, Set[String]]   // For each topic, the consumer groups that are interested in the topic

groupsWithWildcardTopics : Set[String]              // Groups that has wildcard interests for topics

rebalanceRequestQ : LinkedBlockingQueue[String]     // A blocking queue storing all the rebalance requests, the request just contain the group name

requestHandler : RebalanceRequestHandler            // A thread handling all the rebalance requests read from the rebalanceRequestrebalanceReques
A. On Coordinator Startup

...

Code Block
coordinatorStartup() :

1. Read all the topics from ZK and initialize consumerGroupsPerTopic

2. Read all the consumer groups from ZK

2.1 Get the current interested topics of each group, update consumerGroupsPerTopic by adding the group to each topic's interested group list

2.2 If the group has some consumer specifying wildcard topic-counts, then add the group to groupsWithWildcardTopics

2.3 Always try to rebalance every group by adding (group -> new AtomicBoolean(true)) to groupsBeingRebalanced and put group to rebalanceRequestQ

3. Register listeners for topics and their partition changes

3.1 Subscribe TopicChangeListener to /brokers/topics

3.2 Subscribe TopicPartitionChangeListener to each /brokers/topics/[topic]

4. Register listeners for consumer groups and their member changes

4.1 Subscribe registerGroupChangeListener to /consumers/groups/

4.2 Subscribe registerGroupMemeberChangeListener to each /consumers/groups/[groups]/ids

5. Register session expiration listener

6. Initialize and start the requestHandler thread

...

Code Block
TopicPartitionChangeListener.handleChildChange :

Get the set of groups that are interested in this topic from consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group
D. On Rebalance Handling

The requestHandler thread keep block-reading from rebalanceRequestQ, and for each rebalance request for a specific group it calls the rebalance function.

If the rebalance succeeds it will reset groupsBeingRebalanced(group); otherwise it will retry rebalance again.

If the handler cannot finish rebalance successfully with config.maxRebalanceRetries retries, it will throw a ConsumerRebalanceFailedException.

Code Block

rebalance (group) :

1. Get the topics that are interested by the group.

2. Compute the new ownership assignment after reading from ZK the number of partitions and number of threads for each topic.

3. Check if a rebalance is necessary by trying to get the current ownership from ZK for each topic.

3.1 If there is no registered ownership info in ZK, rebalance is necessary

3.2 If some partitions are not owned by any threads, rebalance is necessary

3.3 If some partitions registered in the ownership map do not exist any longer, rebalance is necessary

3.4 If ownership map do not match with the new one, rebalance is necessary

3.5 Otherwise rebalance is not necessary

4. If rebalance is necessary, do the following

4.1 For each consumer in the group, send the "stop-fetcher" command (details of communication is introduced later)

4.2 Then for each consumer in the group, send the "start-fetcher" command with part of the newly computed ownership specific to the consumer

4.3 Then wait until all the consumer has finished starting the fetcher (details of waiting is introduced later)

5. If waiting has timed out, return false; otherwise return true.

Consumer:

A. On Consumer Startup

...