...
------------------------------------------------------------------------------------------------
Consumer Failure Handling
When the coordinator thinks a consumer has failed, it will do the following:
- Remove the consumer from the group metadata.
- Trigger the rebalance process.
Coordinator Failure Handling
When the consumer thinks its current coordinator has failed, it will do the following:
- Refresh its metadata from the existing known brokers to get the new coordinator information (if metadata request failed/timeout then retry until success).
- Set up the connection to the new coordinator.
- Start sending heartbeats to the coordinator.
----------------------------------------- 1/2/3 ------------------------------------------
Coordinator (failed) <-- (ping) -- Consumer 1 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
<-- (ping) -- Consumer 2 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
Broker (alive) <-- (metadata) -- Consumer 1
-- (response) -->
<-- (metadata) -- Consumer 2
-- (response) -->
Coordinator (new) <-- (ping) -- Consumer 1 (alive)
-- (response) -->
<-- (ping) -- Consumer 2 (alive)
-- (response) -->
PingRequest
Code Block | ||||
---|---|---|---|---|
| ||||
{
Version => int16
CorrelationId => int64
GroupId => String
ConsumerId => String
} |
PingResponse
Code Block | ||||
---|---|---|---|---|
| ||||
{
Version => int16
CorrelationId => int64
ErrorCode => int16
}
|
Consumer Failure Handling
When the coordinator thinks a consumer has failed, it will do the following:
- Remove the consumer from the group metadata.
- Trigger the rebalance process.
Coordinator Failure Handling
When the consumer thinks its current coordinator has failed, it will do the following:
- Refresh its metadata from the existing known brokers to get the new coordinator information (if metadata request failed/timeout then retry until success).
- Set up the connection to the new coordinator.
- Start sending heartbeats to the coordinator.
----------------------------------------- 1/2/3 ------------------------------------------
Coordinator (failed) <-- ------------------------------------
Consumer Startup (Subscription Change)
On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.
Therefore, these two process can be merged as one behavior that is triggered by subscription change (from possibly empty).
If consumer has not connected to the coordinator, it will first try to find the current coordinator from metada refresh and setup the connection.
Coordinator Discovery
The coordinator discovery is through the metadata request, which currently contains the 1) cluster information and the 2) leader information for specified topics. Will add a third piece of metadata in the response, which is the coordinator id based on consumer group.
----------------------------------------- 1 ------------------------------------------
Any Broker <-- (metadata) -- Consumer 1 (alive)
-- (response: {cluster, leaders, coordinator}) -->
(ping) -- Consumer 1 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
<-- (ping) -- Consumer 2 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
Broker (alive) <-- (metadata) -- Consumer 1
-- (response) -->
<-- (metadata) -- Consumer 2
-- (response) -->
Coordinator (new) <-- (ping) -- Consumer 1 (alive)
-- (response) -->
<-- (ping) -- Consumer 2 (alive)
-- (response) -->
-------------------------------------------------------------------------------------------
Consumer Startup (Subscription Change)
On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.
Therefore, these two process can be merged as one behavior that is triggered by subscription change (from possibly empty).
If consumer has not connected to the coordinator, it will first try to find the current coordinator from metada refresh and setup the connection.
Coordinator Discovery
The coordinator discovery is through the metadata request, which currently contains the 1) cluster information and the 2) leader information for specified topics. Will add a third piece of metadata in the response, which is the coordinator id based on consumer group.
----------------------------------------- 1 ------------------------------------------
Any Broker <-- (metadata) -- Consumer 1 (alive)
-- (response: {cluster, leaders, coordinator}) -->
---------------------------------------------------------------------------------------
MetadataRequest
Code Block | ||||
---|---|---|---|---|
| ||||
{
Version => int16
CorrelationId => int64
ClientId => String
Topics => [String]
GroupIds => [String]
} |
This MetadataRequest inherits from the TopicMetadataRequest in 0.8.
MetadataResponse
Code Block | ||||
---|---|---|---|---|
| ||||
{
Version => int16
CorrelationId => int64
Brokers => [Broker]
TopicsMetadata => [TopicMetadata]
CoordinatorIds => [int16]
}
|
---------------------------------------------------------------------------------------
Consumer (re)Join-Group
If the consumer 1) did not know which partitions to consumer (since it has just started, for example), 2) realized a topic change either from the metadata response or from subscription change. It will send a join request to the coordinator.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ Version => int16 CorrelationId => int64 ErrorCode => int16 Generation => int16 GroupId => String ConsumerId => String PartitionsToOwn => [<TopicAndPartition>] } |
...
Consumer Group Rebalance
The coordinator will trigger rebalance under the following conditions:
...