...
In addition, for each consumer group the coordinator also maintains the group config and partition ownership metadata in ZK as a json object. This node will be updated upon rebalance completion and read upon coordinator failover (details below).
Consumer Startup (Subscription Change)
...
-----------------------------------------------------------------------------------
FindCoordinatorRequest
Code Block |
---|
{ Version => int16 CorrelationId => int64 ClientId => String GroupId => String } |
FindCoordinatorResponse
...
Code Block |
---|
{ Version => int16 CorrelationId => int64 CoordinatorId => int16 } |
...
---------------------------------------------------------------------------------------
JoinGroupRequest
Code Block |
---|
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerAddr => String SessionTimeout => int64 Subscribing => [String] } |
JoinGroupResponse
Code Block |
---|
{ Version => int16 CorrelationId => int64 ErrorCode => int16 Generation => int16 GroupId => String ConsumerId => String PartitionsToOwn => [<TopicAndPartition>] } |
...
The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance.
Consumer
...
The consumer config and ownership metadata can be stored in one ZK path as a json object. This node will be updated upon rebalance completion and read upon coordinator failover.
Consumer Fetching Data
As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.
...
------------------------------------------------------------------------------------------------
PingRequest
Code Block |
---|
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerId => String } |
PingResponse
Code Block |
---|
{ Version => int16 CorrelationId => int64 ErrorCode => int16 } |
...