Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that the assigned consumer client id is assigned by the coordinator will also be used in heartbeat protocol and offset management talked below.

Consumer Fetching Data

As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.

To fetch data, consumers will issue fetch requests to the known leaders of the assigned partitions with a selector, and the poll function will return data from all the readable channels in the selector.

Wildcard Subscription Handling

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Failure Detection

When the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumer).

Heartbeat Protocol

1) Consumers periodically send heartbeat request to the coordinator based on every session_timeout / heartbeat_frequency (which is an integer > 1).

2) Upon receiving the heartbeat request, coordinator checks the generation number and the consumer id, if they are not validate (i.e., consumer id not assigned before, or generation number is smaller than current value), send an error response; otherwise send a normal response.

3) If coordinator has not heard from a consumer after session_timeout, treat the consumer as failed.

4) If consumer has not heart from the coordinator after session timeout, treat the coordinator as failed.

5) If consumer found the channel with the coordinator has closed, treat the coordinator as failed.

 

-----------------------------------------------------------------------------------

Coordinator                                                                             <-- (ping) --     Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

-----------------------------------------------------------------------------------

Coordinator                                                                             <-- (ping) --     Consumer 1 (alive)

                                                                                               -- (response) -->

(Have not heard from consumer 2, it must have failed)                                 Consumer 2 (failed)

Coordinator Startup (Coordinator Failover)

Coordinators will be constructed upon broker startup, and initialized with empty consumer groups. There are two cases a new consumer group can be created on the coordinator:

1) When a first request is received from one of the consumers in this group, the coordinator need to create this group after validating by the group name that it is the assigned coordinator.

2) Since the assignment of coordinators can change as the "offset" topic partition leaders change (mainly due to broker down), they also have to take over existing consumer groups upon coordinator failovers.

The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance.

 

Consumer Group Metadata in ZK

The consumer config and ownership metadata can be stored in one ZK path as a json object. This node will be updated upon rebalance completion and read upon coordinator failover.

Consumer Fetching Data

As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.

To fetch data, consumers will issue fetch requests to the known leaders of the assigned partitions with a selector, and the poll function will return data from all the readable channels in the selector.

Wildcard Subscription Handling

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Failure Detection

When the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumer).

Heartbeat Protocol

1) Consumers periodically send heartbeat request to the coordinator based on every session_timeout / heartbeat_frequency (which is an integer > 1).

2) Upon receiving the heartbeat request, coordinator checks the generation number and the consumer id, if they are not validate (i.e., consumer id not assigned before, or generation number is smaller than current value), send an error response; otherwise send a normal response.

3) If coordinator has not heard from a consumer after session_timeout, treat the consumer as failed.

4) If consumer has not heart from the coordinator after session timeout, treat the coordinator as failed.

5) If consumer found the channel with the coordinator has closed, treat the coordinator as failed.

Note that on coordinator failover, the consumers may discover the new coordinator before or after the new coordinator has finsihed the failover process including loading the consumer group metadata from ZK, etc. In the latter case, the new coordinator will just accept its ping request as normal; in the former case, the new coordinator may reject its request, causing it to re-dicover coordinators and re-connect again, which is fine. Also, if the consumer connects to the new coordinator too later, it may already be expired by the new coordinator and will hence be treated as a new consumer, which is also fine.

 

------------------------------------------------------------------------------------------------------------------

Coordinator (failed)                                                                                                                                   <-- (ping) --        Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)-- (response) -->

                                                                                               <-- (ping) --     Consumer 2                                                                                                <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)-- (response) -->

-----------------------------------------------------------------------------------

Coordinator (crashed)                                                                                                                                  <-- X (ping) --        Consumer 1 (alive)

                                                                                               -- (response) -->

(Have not heard from consumer 2 (Broken pipe from coordinator, it must have failed)                                                                                                  <-- X --     Consumer 2 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

            Consumer 2 (failed)

-----------------------------------------------------------------------------------

Coordinator (failed)                                                                 <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)

                                                                                               <-- ------------

PingRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
}

 

PingResponse

(ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)

-----------------------------------------------------------------------------------

Coordinator (crashed)                                                            <-- X --    Consumer 1 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

                                                                                               <-- X --     Consumer 2 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

------------------------------------------------------------------------------------------------

PingRequest

Code Block
{
  Version
Code Block
{
  Version                => int16
    => int16
  CorrelationId          => int64
  GroupId                => String
 ErrorCode ConsumerId             => int16String
}

 

Consumer Failure Handling

When the coordinator thinks a consumer has failed, it will do the following:

1) Remove the consumer from the group metadata.

2) Trigger the rebalance process.

Coordinator Failure Handling

When the consumer thinks its current coordinator has failed, it will do the following:

1) Discover the new coordinator information from the existing known brokers through the find coordinator requests.

2) Set up the connection to the new coordinator.

3) Start sending heartbeats to the new coordinator.

 

------------------------------------------------------------------------------------

Coordinator (failed)                                                                <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

 

Broker (alive)                                                                         <-- (find_coordinator) -- Consumer 1

                                                                                              -- (response) -->

                                                                                              <-- (find_coordinator) -- Consumer 2

                                                                                              -- (response) -->

 

Coordinator (new)                                                                  <-- (ping) --    Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

PingResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
}
 

Consumer Failure Handling

When the coordinator thinks a consumer has failed, it will do the following:

1) Remove the consumer from the group metadata.

2) Trigger the rebalance process.

Coordinator Failure Handling

When the consumer thinks its current coordinator has failed, it will do the following:

1) Discover the new coordinator information from the existing known brokers through the find coordinator requests.

2) Set up the connection to the new coordinator.

3) Start sending heartbeats to the new coordinator.

 

------------------------------------------------------------------------------------

Coordinator (failed)                                                                <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

 

Broker (alive)                                                                         <-- (find_coordinator) -- Consumer 1

                                                                                              -- (response) -->

                                                                                              <-- (find_coordinator) -- Consumer 2

                                                                                              -- (response) --->

 

Coordinator

...

(

...

Coordinators will be constructed upon broker startup, and initialized with empty consumer groups. There are two cases a new consumer group can be created on the coordinator:

1) When a first request is received from one of the consumers in this group, the coordinator need to create this group after validating by the group name that it is the assigned coordinator.

2) Since the assignment of coordinators can change as the "offset" topic partition leaders change (mainly due to broker down), they also have to take over existing consumer groups upon coordinator failovers.

The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance.

 

Consumer Group Metadata in ZK

)                                                                  <-- (ping) --    Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

-------------------------------------------------------------------------------------------

 The consumer config and ownership metadata can be stored in one ZK path as a json object. This node will be updated upon rebalance completion and read upon coordinator failover.

Consumer Group Rebalance

The coordinator will trigger rebalance under the following conditions:

...