...
1) For each consumer group, the group metadata containing:
- Its subscribed topic list.List of topics the group subscribes to
- Group configs, including session timeout, etc.
- Consumer metadata for each consumer in the group.
- Current offsets for each consumed topic/partition.
- Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.
...
Code Block |
---|
{ Version => int16 CorrelationId => int64 ClientId => String GroupId => String } |
ConsumerMetadataResponse
Code Block |
---|
{ CorrelationId => int64 ErrorCode => int16 CoordinatorId => Broker GenerationId => int64 } |
...
Code Block |
---|
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerAddrConsumerHost => String SessionTimeout => int64 Topics Subscribing => [String] } |
JoinGroupResponse
Code Block |
---|
{ Version => int16 CorrelationId => int64 ErrorCode => int16 GenerationGenerationId => int16 GroupId => String ConsumerId => String PartitionsToOwn => [<TopicAndPartition>TopicAndPartition] } |
HeartbeatRequest
Code Block |
---|
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerId => String } |
...
Code Block |
---|
{ CorrelationId => int64 ErrorCode => int16 } |
Wildcard Subscription Handling
With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.
Rebalance Process: Option 2
For this approach, if a coordinator tries to rebalance, it needs to firstly increment the generation number, update the ZK accordingly, and then close the socket.
Also the consumer group generation number metadata needs to be able to propagated to all brokers from ZK, either through controller or through coordinator. We do not consider ZK directly push this information to brokers upon updates or a pull-based method since neither are scalable.
Then the rebalance process becomes:
1) Upon coordinator failure, fetch the new coordinator id along with the generation number.
2) If the generation number is not bigger than its currently held generation number, send ping request to the coordinator upon connection setup.
3) Otherwise, prepare the rebalance by stop fetching and call onPartitionDeassigned, and then send a join request to the coordinator upon connection setup.
4) Coordinator accepts the offset commit even if the generation number is smaller if it is undergoing rebalance for the group (will talk about offset management in the next section).
Consumer's logic can then be summarized as the following state-machine:
-----------------------------------------------------------------------------------------------------------------
Joined (join response received, started fetching and pinging):
1) Topic Subscription Change => Joining (prepare for rebalance, send join request)
2) Coordinator Failed => Discovering (send find_coordinator)
3) Shutdown Signal => Stopped
Discovering (lost connection with coordinator, but may be fetching data)
1) Found Coordinator, Generation Number Changed => Joining
2) Found Coordinator, Generation Number OK => Joined
3) Shutdown Signal => Stopped
Joining (not fetching, waiting for join response)
1) Error Join Response => Discovering
2) Normal Join Response => Joined
3) Shutdown Signal => Stopped
Stopped (not started, killed, shutdown)
1) Startup => Discovering
-----------------------------------------------------------------------------------------------------------------