Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

One of the brokers is elected as the coordinator for all the consumer groups. It will be responsible for :

...

triggering rebalancing attempts for certain consumer groups according to group consumer changes or interested topic partition changes and communicate the rebalance results to consumers

...

of the group. More specifically, it will try to do the following:

  1. Detecting newly started consumers. The coordinator will not try to discover new consumers but wait for the consumers to try to connect to it. When a new connection request has been received by the coordinator, it will receive the registration request from the new consumer; and if accepts the registration, record the consumer's meta information such as session timeout value and interested topic-counts and sends back a confirm reply.

                                    <-- (register) -- Consumer (startup)

Coordinator              <-- (register) -- Consumer (startup)

                                    <-- (register) -- Consumer (startup)

----------------------------------------------------------------------------------------------

                             (record metadata)       - (confirm) -> Consumer (register succeed, block reading for coordinator requests such as ping, stop-fetcher, start-fetcher)

Coordinator       (record metadata)       - (confirm) -> Consumer (same as above)

                                                                    -- (reject) -->    Consumer (register failed due to some invalid meta info, try to reconnect to the coordinator and register again)

  1. Detecting consumer failure. For every registered consumer, the coordinator will try to check its liveness by periodic pinging it. The replied ping response should include the partition consumed offfset if auto-offset-commit is enabled. If the consumer fails to reply the ping request for a configurable number of times. The coordinator will mark the consumer as failed and trigger the rebalance procedure for the group this consumer belongs to.

Coordinator               -- (ping) -->     Consumer (alive)

                                     - (ping) ->     Consumer (failed)

----------------------------------------------------------------------------------------------

Coordinator          (record offset)      <-- (reply) --    Consumer (alive)

                                (rebalance)         time out...         Consumer (failed)

  1. Detecting changes to topic partitions. The coordinator keeps a watch for every registered partitions stored in brokers. When a new partition is added (possibly due to a new added broker) or an old partition is deleted (possibly due to a broker failure). It will trigger the rebalance procedure for every group that is consuming this partition.
  2. Process rebalance tasks and communicate the rebalance results to consumers. When a coordinator decides a rebalance is needed for certain group, it will first sends the stop-fetcher request to each consumers in the group and wait for all of them to reply with the last partition consumed offsets. Upon receiving the reply, the consumer should have stopped its fetchers. Then it will send a start-fetcher request to each consumers in the group with the newly computed partition assignment along with the starting offset. The coordinator will finish the rebalance by waiting for all the consumers to finish restarting the fetchers and respond. If any reply is not received after the timeout value, the coordinator will retry the rebalance procedure by marking the consumer as failed and sending the stop-fetcher request to everybody else. If the rebalance cannot finish within a number of rebalance retries, the coordinator will log that as an exception.

Coordinator               -- (stop-fetcher) -->     Consumer (alive)

                                     - (stop-fetcher) ->     Consumer (alive)

----------------------------------------------------------------------------------------------

Coordinator          (record offset)      <-- (reply) --    Consumer (alive)

                                (record offset)      <-- (reply) --    Consumer (alive)

----------------------------------------------------------------------------------------------

Coordinator (compute new assignment)               -- (start-fetcher) -->     Consumer (alive)

                                                                                       - (start-fetcher) ->     Consumer (become failed)

----------------------------------------------------------------------------------------------

Coordinator          <-- (reply) --    Consumer (alive)

(retry rebalance)  time out...        Consumer (failed)

When a coordinator is failed, other peer brokers will notice that and try to re-elect to be the new coordinator. The new coordinator will not try to connect to the consumers but wait for the consumers to realize the previous coordinator has dead and try to re-connect to itself

Upon receiving a new socket channel from a new consumer, the coordinator will verify its register request info, and respond whether the registry is accepted or not. If the new consumer's registry is accepted and kept in memory, the coordinator will keep pinging it through the socket channel to keep track whether it is still alive.

When a coordinator decides a rebalance is needed for certain group, it will first sends the restart-fetcher command to each consumers in the group with the newly computed assignment. Each consumers will only receive the partition-info of the partitions that are assigned to itself. The coordinator will finish the rebalance by waiting for all the consumers to finish restarting the fetchers and respond.

The consumer, upon startup, will consult known brokers for the current coordinator. The known broker list is put in the consumer properties file. Then the consumer will try to create a socket channel to the coordinator and register itself through the socket channel; once accepted, it will keep trying to read new requests from the coordinator and respond, but never pro-actively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost, and stop its fetchers and try to reconnect to the possibly new coordinator by restarting the consulting process again. Its workflow logic can be found here.

3. Configuration

Config Options to Brokers/Consumers

...