...
- Coordinator path: stores the current coordinator info.
/consumers/coordinator --> brokerId (ephemeral; created by coordinator)
- ConsumerChannel path: stores the coordinator commands to the consumer.
/consumers/groups/group/channel/consumer/commandId --> command (sequential persistent; created by coordinator, removed by consumer)
Coordinator:
The consumer coordinator keeps the following fields:
Code Block |
---|
coordinatorElector |
...
: ZkElection // A ZK based elector using the coordinator path mentioned above groupsBeingRebalanced : Map |
...
[String, AtomicBoolean |
...
] |
...
// For each group, a bit indicating if the group is under rebalancing |
...
consumerGroupsPerTopic : Map |
...
[String, Set |
...
[String |
...
] |
...
] // For each topic, the consumer groups that are interested in the topic |
...
groupsWithWildcardTopics : Set |
...
[String |
...
] // Groups that has wildcard interests for topics |
...
rebalanceRequestQ : LinkedBlockingQueue |
...
[String |
...
] // A blocking queue storing all the rebalance requests, the |
...
request just |
...
contain |
...
the group |
...
name requestHandler |
...
: RebalanceRequestHandler // A thread handling all the rebalance requests read from the |
...
rebalanceRequest
|
A. On Coordinator Startup
...
The coordinatorElector, upon initialization, will immediately try to become the leader. If someone else has become the leader, it will listen to the coordinator path for data change, and try to re-elect whenever the current elector resigns (i.e. the data on the path is deleted).
Whenever it elects to become the leader, it will trigger the callback function that is provided by its caller, i.e. the coordinator.
Code Block |
---|
coordinatorStartup() : |
...
1. Read all the topics from ZK and initialize consumerGroupsPerTopic |
...
2. Read all the consumer groups from ZK |
...
2.1 Get the current interested topics of each group, update consumerGroupsPerTopic by adding the group to each topic's interested group list |
...
2.2 If the group has some consumer specifying wildcard topic-counts, then add the group to groupsWithWildcardTopics |
...
2.3 Always try to rebalance every group by adding (group -> new AtomicBoolean(true)) to groupsBeingRebalanced and put group to rebalanceRequestQ |
...
3. Register listeners for topics and their partition changes |
...
3.1 Subscribe TopicChangeListener to /brokers/topics |
...
3.2 Subscribe |
...
TopicPartitionChangeListener |
...
to each /brokers/topics/ |
...
[topic |
...
] |
...
4. Register listeners for consumer groups and their member changes |
...
4.1 Subscribe registerGroupChangeListener to /consumers/groups/ |
...
4.2 Subscribe |
...
registerGroupMemeberChangeListener |
...
to each /consumers/groups/ |
...
[groups |
...
]/ids |
...
5. Register session expiration listener |
...
6. Initialize and start the requestHandler thread |
B. On Coordinator Change/Failover
Whenever the current coordinator's hosted server dies, other coordinator's elector will realize that through the ZK listener and will try to re-elect to be the leader, and whoever wins will trigger the callback function coordinatorStartup.
When the dead server comes back, it will first reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc, re-register the session expiration listener, and try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.C. On ZK Watcher Firesthe zkClient will atomically reconnect to it and trigger the handleNewSession function.
Code Block |
---|
handleNewSession :
1. Reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc
2. Re-register the session expiration listener (this is because ZkClient does not re-register itself once fired)
3. Try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.
|
C. On ZK Watcher Fires
Handle group change
Code Block |
---|
GroupChangeListener.handleChildChange :
1. Get the newly added group (since /consumers/groups are persistent nodes, no groups should be deleted even if there is no consumers any more inside the group)
2. Subscribe the registerGroupMemeberChangeListener on /consumers/groups/group
3. Read all the topics this group is interested in, for each topic:
3.1 If the topic already exists in consumerGroupsPerTopic, update its list by adding this group
3.2 If the topic is not in consumerGroupsPerTopic yet, add the entry (topic \-> Set(group))
4. If some of this group's consumers has wildcard interests, add that to groupsWithWildcardTopics
5. If the group already has some interested existed topics, put (group \-> new AtomicBoolean(true)) to groupsUnderRebalance, and put the group to rebalanceRequestQ;
Otherwise just put (group -> new AtomicBoolean(false)) to groupsUnderRebalance
|
Handle group member change
Code Block |
---|
GroupMemberChangeListener.handleChildChange :
1. If some topics are no longer interested due to the deletion of some consumer, update consumerGroupsPerTopic by removing the group from these topics' list
2. If the group no longer contain any consumer, do nothing;
Otherwise if groupsBeingRebalanced(group).compareAndSet(false, true) succeeds, put the group to rebalanceRequestQ.
|
Handle topic change
Code Block |
---|
TopicChangeListener.handleChildChange :
1. Get the newly added topic (since /brokers/topics are persistent nodes, no topics should be deleted even if there is no consumers any more inside the group)
2. For each newly added topic:
2.1 Subscribe TopicPartitionChangeListener to /brokers/topics/topic
2.2 Get the set of groups that are interested in this topic from both consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group*
* By trying to request rebalance, we do the following:
if (groupsBeingRebalanced(group).compareAndSet(false, true)) rebalanceRequestQ.put(group)
|
Handle topic partition change
Code Block |
---|
TopicPartitionChangeListener.handleChildChange :
Get the set of groups that are interested in this topic from consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group
|
D. On Rebalance Handling
Consumer:
...