Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

One of the brokers is elected as the coordinator for all the consumer groups. It will be responsible for triggering rebalancing attempts for certain consumer groups according to group consumer changes or interested topic partition changes and communicate the rebalance results to consumers of the group. More specifically, it will try to do the following:

Detecting newly started consumers. The coordinator will not try to discover new consumers but wait for the consumers to try to connect to it. When a new connection request has been received by the coordinator, it will receive the registration request from the new consumer; and if accepts the registration, record the consumer's meta information such as session timeout value and interested topic-counts and sends back a confirm reply.

                                    <-- (register) -- Consumer (startup)

...

                             (record metadata)       -- (confirm) --> Consumer (register succeed, block reading for coordinator requests such as ping, stop-fetcher, start-fetcher)

Coordinator       (record metadata)       -- (confirm) --> Consumer (same as above)

                                                                    -- (reject) -->    Consumer (register failed due to some invalid meta info, try to reconnect to the coordinator and register again)

Detecting consumer failure. For every registered consumer, the coordinator will try to check its liveness by periodic pinging it. The replied ping response should include the partition consumed offfset if auto-offset-commit is enabled. If the consumer fails to reply the ping request for a configurable number of times. The coordinator will mark the consumer as failed and trigger the rebalance procedure for the group this consumer belongs to.

Coordinator               -- (ping) -->     Consumer (alive)

                                     -- (ping) -->     Consumer (failed)

----------------------------------------------------------------------------------------------

...

                                (rebalance)         time out...         Consumer (failed)

Detecting changes to topic partitions. The coordinator keeps a watch for every registered partitions stored in brokers. When a new partition is added (possibly due to a new added broker) or an old partition is deleted (possibly due to a broker failure). It will trigger the rebalance procedure for every group that is consuming this partition.

Process rebalance tasks and communicate the rebalance results to consumers. When a coordinator decides a rebalance is needed for certain group, it will first sends the stop-fetcher request to each consumers in the group and wait for all of them to reply with the last partition consumed offsets. Upon receiving the reply, the consumer should have stopped its fetchers. Then it will send a start-fetcher request to each consumers in the group with the newly computed partition assignment along with the starting offset. The coordinator will finish the rebalance by waiting for all the consumers to finish restarting the fetchers and respond. If any reply is not received after the timeout value, the coordinator will retry the rebalance procedure by marking the consumer as failed and sending the stop-fetcher request to everybody else. If the rebalance cannot finish within a number of rebalance retries, the coordinator will log that as an exception.

Coordinator               -- (stop-fetcher) -->     Consumer (alive)

                                     -- (stop-fetcher) -->     Consumer (alive)

----------------------------------------------------------------------------------------------

...

                                                                                       -- (start-fetcher) -->     Consumer (become failed)

...