Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Consumers periodically send heartbeat along with the generation number to the coordinator based on session_timeout * 1/3.
  2. And upon receiving the heartbeat request, coordinator send back response immediately if the generation is good.
  3. If coordinator has not heard from a consumer after session_timeout, treat the consumer as failed.
  4. If consumer has not heart back from the coordinator session timeout * 2/3 after it sends heartbeat request (possibly multiple times), treat the coordinator as failed.
  5. If consumer found the channel with the coordinator has closed, treat the coordinator as failed.
  6. If a ping request with an older or newer generation number is received, reject the ping request to let the consumer to send the join request.

 

----------------------------------------- 1/2 ------------------------------------------

...

-------------------------------------------------------------------------------------------

 

Consumer Startup (Subscription Change

...

)

On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.

Therefore, these two process can be merged as one behavior that is triggered by subscription change (from possibly empty):.

If consumer has not connected to the coordinator, it will first try to find the current coordinator from metada refresh and setup the connection.

...

Coordinator Discovery

The coordinator discovery is through the metadata request, which currently contains the 1) cluster information and the 2) leader information for specified topics. Will add a third piece of metadata in the response, which is the coordinator id based on consumer group.

 

----------------------------------------- 2 1 ------------------------------------------

Coordinator Any Broker                                                                             <-- (pingmetadata) --     Consumer 1 (alive)

                                                                                               -- (response: {cluster, leaders, coordinator}) -->                                                                                               <

----------- (ping) --     Consumer 2 (alive)                                                                                               -- (response) -->                                                                                               <-- (ping) --     Consumer 3 (new)

(Did not know consumer 3 before, rebalance)

or

(Consumer 3's subscription has changed, rebalance)

---------------------------------------------------------------------------------------

 

  1. join request to the coordinator with its current subscription info, then block on waiting for responses, which either contains the partition assignment or indicates rejecting the join request.
  2. Upon receive the join request, the coordinator triggers the rebalance, which will generate the partition assignment result, and return to the new consumer.
  3. Coordinator, upon receive a join request, will trigger a rebalance.

 

 

 

Consumer Group Rebalance

  1. On group rebalance, the coordinator will first sends a error code indicating a rebalance in its responses to all the consumers (may except the one who already sends a join request triggering this rebalance).
  2. Upon receiving the error code, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned(), and then sends a join request to the coordinator.
  3. Upon receiving join request from all the consumers within the group, coordinator calculates the partition assigned and send it back in the join response.
  4. If some consumer's join request has not been received after session timeout, remove this consumer and re-trigger the rebalance by sends the join responses to rest consumers.
  5. Upon receiving the normal join response, consumers call onPartitionAssigned() and start fetching.
  6. If receiving error join response, consumer check if it is fatal; if so throw exceptions, otherwise re-send the join request.

 

Take the rebalance procedure upon new consumer startup for example:

Consumer (re)Join-Group

If the consumer 1) did not know which partitions to consumer (since it has just started, for example), 2) realized a topic change either from the metadata response or from subscription change. It will send a join request to the coordinator.

Upon received a join request, the coordinator will trigger a rebalance.

----------------------------------------- 2 ------------------------------------------

Coordinator                                                                             <-- (join) --     Consumer 1 (alive)

                                                                                              

(received a join request on consumer 1, rebalance)

------------------------------------------------- 1 -------------------------------------------------------------------------------------

                               <-- (join) -- Consumer 3 (new)

 

Coordinator            <-- (ping) -- Consumer 1 (alive)

                              -- (rebalance) -->

 

                               <-- (ping) -- Consumer 2 (alive)

                              -- (rebalance) -->

Consumer Group Rebalance

The coordinator will trigger rebalance under the following conditions:

  1. It found a consumer has failed.
  2. It received a join request from a (possibly unknown) consumer.
  3. It found a partition change on the topics this group is subscribing from.

Rebalance Process: Option 1

On group rebalance, the coordinator will first sends a error code indicating a rebalance in its responses to all the consumers.

  1. Upon receiving the error code, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned(), and then sends a join request to the coordinator.
  2. Upon receiving a join request, coordinator checks if the specified group has already exist.
    1. If yes, hold the request and wait for the join requests from all the consumers within the group.
    2. If not, adds the group, and do not need to wait for any more join request and directly proceed to the next step.
  3. Coordinator calculates the partition assigned along with the new generation number and send it back in the join response.
  4. If some consumer's join request has not been received after session timeout, remove this consumer and re-trigger the rebalance by sends the join responses to rest consumers.
  5. Upon receiving the normal join response, consumers call onPartitionAssigned() and start fetching.
  6. If receiving error join response, consumer check if it is fatal; if so throw exceptions, otherwise re-send the join request.

 

Take the rebalance procedure upon new consumer startup for example:

---------------------------- 2 ----------------------------------------- 1 --------------------------------------------

                              (wait for response) Consumer 3 (new)

 

Coordinator                                         Consumer 1 (alive)

                                                                 (stop fetching)

                                                                 (call onPartitionDeassigned)

                                        <-- (join) --

 

                                                            Consumer 2 (alive)

                                                                 (stop fetching)

                                                                 (call onPartitionDeassigned)

                                        <-- (join) --

-----------------------------------------

                               <-- (join) -- Consumer 3 (new)

 

Coordinator            <-- (ping) -- Consumer 1 (alive)

                              -- (rebalance) -->

 

                               <-- (ping) -- Consumer 2 (alive)

                              -- (rebalance) -->

 --------------------------- 2/3 /5 -------------------------------------------------------------------------------------

Coordinator   (calculate new assignment                              (wait for response) Consumer 3 (new)

 

Coordinator                                         Consumer 1 (alive)

                                                                 (stop fetching)

                                                                                  (call onPartitionDeassigned)

                                        <-- (responsejoin) --> Consumer 3 (new)

                                                                             (call onPartitionAssigned)

 

                                                            Consumer 2 (alive)

                                                                 (stop                                                                              (start fetching)

                                                                                         (call onPartitionDeassigned)

                                        <-- (responsejoin) --> Consumer 1 (alive)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

                                           -- (response) --> Consumer 2 (alive)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

 --------------------------- 43/6 5 -------------------------------------------------------------------------------------

Coordinator   (have not heard from Consumer 2, retry rebalancecalculate new assignment)

 

                                           -- (retry rebalanceresponse) --> Consumer 3 (new)                                           <-- (join) --

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

 

                                           -- (retry rebalanceresponse) --> Consumer 1 (alive)

                                           <-- (join) --

                                                                             (call onPartitionAssigned)

                                                                             (start fetching) 


                                           < -- X (response) -->              > Consumer 2 (failed(alive)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

 --------------------------- 4/6 -------------------------------------------------------------------------------------

Coordinator   (have not heard from Consumer 2, retry rebalance)

 

                                           -- (retry rebalance) --> Consumer 3 (new)

                                           <-- (join) --
 

                                           -- (retry rebalance) --> Consumer 1 (alive)

                                           <-- (join) --

 

                                           <-- X -->              Consumer 2 (failed)

-----------------------------------------------------------------------------------------------------------------

 

Consumer's logic is summarized as:

-----------------------------------------------------------------------------------------------------------------

(Re)Start:

  On 1) start-up, 2)subcription-change, 3) heart-beat fail, 4) coordinator fail, sends a join to the coordinator (if it is not known first find it through metadata).

Join group:

  Upon receive join response, if it is normal record the sequence number and assignment and start consuming; otherwise retry or fail-fatally based on error code.

Heart beat:

  Periodically send ping to coordinator, and if error received goes to the "start" step.

-----------------------------------------------------------------------------------------------------------------

 

The pros of this design would be avoiding unnecessary rebalances on coordinator failover, the cons is mainly at the fact that rebalance latency is lower-bounded by the ping frequency.

Another approach would be trigger the rebalance by closing the socket:

Rebalance Process: Option 2

On group rebalance, the coordinator will close sockets to all consumers (probably except the one which has sent him a join request).

  1. Upon getting a broken pipe from the current coordinator, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned().
  2. It then find the new coordinator through metadata request, and send a join request to the new coordinator.
  3. Upon receiving join request from all the consumers within the group, coordinator calculates the partition assigned along with the new generation number and send it back in the join response.
  4. Upon receiving a join request from an unknown group, immediately calculates the partition assigned (without waiting for any new comers).

 

Consumer's logic is similar to option 2 except that consumer do not need to handle heartbeat-failure case, since it is merged into coordinator-failure: the key here is to enforce a rebalance to consumers as coordinator failures.

This can lift the lower bound of the rebalance latency on ping frequency, but will require more one metadata request along with unnecessary rebalances on coordinator failover (cause they are now treated as one scenario).

 ------------------

Offset Management

Upon receiving the commit offset request, the coordinator checks if the consumer is 1) within the group specified, 2) owns the partitions it commit offsets to, and 3) has the correct generation number. If yes append the offset entry to the corresponding log, otherwise rejects the request.

...

Upon receiving a reject response from the coordinator, the consumer will try to connect to the new consumer coordinator (i.e., set up the connection, send heartbeat and get valid response) and then retry.

 

The pros of this design would be having single-threaded consumer with simplified consumer-coordinator communication pattern (coordinator simply listens on each channel, receive requests and send responses).

The cons are mainly the fact that rebalance latency is determined by the longest heart beat timeout among all the consumers.

 

Consumer Architecture

API Implementation

...