You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

The following is a draft design that uses a high-available consumer coordinator at the broker side to handle consumer rebalance. By migrating the rebalance logic from the consumer to the coordinator we can resolve the consumer split brain problem and help thinner the consumer client.

Overview:

One of the brokers is elected as the coordinator for all the consumer groups. It will be responsible for:

  1. Watch for new topics and new groups
  2. Watch for consumer group member changes and topic partition changes
  3. Rebalance logic for affected groups in response to watched change events
  4. Communicate the rebalance results to consumers

When a coordinator decides a rebalance is needed for certain group, it will first sends the stop-fetcher command to each consumers in the group (this communication channel is currently implemented using a ZK based queue), and then sends the start-fetcher command to each consumers with the assigned partitions. Each consumers will only receive the partition info of the partitions that are assigned to itself. The coordinator will finish the rebalance by waiting for all the consumers to finish starting the fetchers and respond (currently implemented as cleaning the queue).

Paths Stored in ZK:

Most of the original ZK paths storage are kept, in addition to the following paths:

  1. Coordinator path: stores the current coordinator info.   /consumers/coordinator --> brokerId (ephemeral; created by coordinator)
  2. ConsumerChannel path: stores the coordinator commands to the consumer.     /consumers/groups/group/channel/consumer/commandId --> command (sequential persistent; created by coordinator, removed by consumer)

Coordinator:

The consumer coordinator keeps the following fields:

coordinatorElector : ZkElection   // A ZK based elector using the coordinator path mentioned above

groupsBeingRebalanced : Map[String, AtomicBoolean]  // For each group, a bit indicating if the group is under rebalancing

consumerGroupsPerTopic : Map[String, Set[String]]   // For each topic, the consumer groups that are interested in the topic

groupsWithWildcardTopics : Set[String]   // Groups that has wildcard interests for topics

rebalanceRequestQ : LinkedBlockingQueue[String]    // A blocking queue storing all the rebalance requests, each for one group

requestHandler : RebalanceRequestHandler      // A thread handling all the rebalance requests read from the rebalanceRequestQ

A. On Coordinator Startup

Every server will create an coordinator instance as its member, whose construction function will only initialize the coordinatorElector by passing a callback function called coordinatorStartup.

The coordinatorElector, upon initialization, will immediately try to become the leader. If someone else has become the leader, it will listen to the coordinator path for data change, and try to re-elect whenever the current elector resigns (i.e. the data on the path is deleted). Whenever it elects to become the leader, it will trigger the callback function that is provided by its caller, i.e. the coordinator.

coordinatorStartup() :

1. Read all the topics from ZK and initialize consumerGroupsPerTopic

2. Read all the consumer groups from ZK

2.1 Get the current interested topics of each group, update consumerGroupsPerTopic by adding the group to each topic's interested group list

2.2 If the group has some consumer specifying wildcard topic-counts, then add the group to groupsWithWildcardTopics

2.3 Always try to rebalance every group by adding (group -> new AtomicBoolean(true)) to groupsBeingRebalanced and put group to rebalanceRequestQ

3. Register listeners for topics and their partition changes

3.1 Subscribe TopicChangeListener to /brokers/topics

3.2 Subscribe TopicPartitionChangeListener to each /brokers/topics/[topic]

4. Register listeners for consumer groups and their member changes

4.1 Subscribe registerGroupChangeListener to /consumers/groups/

4.2 Subscribe registerGroupMemeberChangeListener to each /consumers/groups/[groups]/ids

5. Register session expiration listener

6. Initialize and start the requestHandler thread

B. On Coordinator Change/Failover

Whenever the current coordinator's hosted server dies, other coordinator's elector will realize that through the ZK listener and will try to re-elect to be the leader, and whoever wins will trigger the callback function coordinatorStartup.

When the dead server comes back, it will first reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc, re-register the session expiration listener, and try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.

C. On ZK Watcher Fires

D. On Rebalance Handling

Consumer:

A. On Consumer Startup

B. On Consumer Failover

Coordinator-Consumer Communication

A. Coordinator Commands Format:

CommandCommand {

  option                                  :  String                                                                                              // Can either be "start-fetcher" or "stop-fetcher"

  ownershipMap                  :  Map[topicPartition: String  =>  consumerThread: String]      // a map of owned partitions to consumer threads, only available for "start-fetcher"

}

B. Coordinator-Side Sending Commands to Consumer

C. Consumer-Side Handling of Coordinator Commands

  • No labels