Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

One of the brokers is elected as the coordinator for all the consumer groups. It will be responsible for triggering rebalancing attempts for certain consumer groups on consumer group membership changes or interested topic partition changes. It will also be responsible for communicating the resulting partition-consumer ownership configuration to all consumers of the group under rebalance. More specifically, it will try to do the following:

Detecting newly started consumers

. The coordinator will not try to discover new consumers but wait for the consumers to try to connect to it. When a new connection request has been received by the coordinator, it will receive the registration request from the new consumer. If the consumer's config options are valid, it accepts the registration, record the consumer's meta information such as session timeout value and number of streams per topic that the consumer is interested in, and sends back a connection established acknowledgment.

...

                                                                    -- (reject) -->    Consumer (register failed due to some invalid meta info, try to reconnect to the coordinator and register again)

Detecting consumer failure

. For every registered consumer, the coordinator will check its liveness through periodic heartbeats. The heartbeat response should include the consumed offset per partition if auto-offset-commit is enabled. If the consumer fails to reply to a heartbeat for configured number of times, the coordinator will mark the consumer as failed and trigger the rebalance procedure for the group this consumer belongs to.

...

                                (rebalance)         time out...         Consumer (failed)

Detecting changes to topic partitions

. The coordinator keeps a watch on all topic partitions registered in a cluster, that are being consumed by at least one consumer group. When a new partition is added (possibly due to a new added broker) or an old partition is deleted (possibly due to a broker failure), it will trigger the rebalance procedure for every group that is consuming this partition.

Initiate rebalance operation and communicate new partition ownership to consumers

. When a coordinator initiates a rebalance operation for a consumer group, it will first send the stop-fetcher request to each consumer in the group and wait for all of them to reply with the last partition consumed offsets. Upon receiving the reply, the consumer should have stopped its fetchers. Then it will send a start-fetcher request to each consumer in the group with the newly computed partition assignment along with the starting offset. The coordinator will finish the rebalance by waiting for all the consumers to finish restarting the fetchers and respond. If any reply is not received after the timeout value, the coordinator will retry the rebalance operation. If the rebalance cannot finish within a number of rebalance retries, the coordinator will log that as a FATAL error and stop rebalancing that consumer group.

...

(retry rebalance)  time out...        Consumer (failed)

When a coordinator dies, other peer brokers will notice that and try to re-elect a new coordinator. The new coordinator will not try to connect to the consumers but wait for the consumers to realize the previous coordinator is dead and try to connect to the new one.

The consumer, upon startup, will query the brokers in the cluster for the current coordinator. The broker list is specified in the consumer properties file. Then the consumer will try to establish a socket channel to the coordinator and register itself through the socket channel; once accepted, it will keep trying to read new requests from the coordinator and respond, but never pro-actively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost, and stop its fetchers and try to reconnect to the possibly new coordinator by restarting the consulting process again. Its workflow logic can be found here. The main motivation behind having the consumer not send heartbeats to the co-ordinator is to keep the consumer design simple. Implementing blocking socket channels is significantly simpler than implementing a full-fledged socket server using non-blocking socket channels. This goes back to making it easier to rewrite the consumer in other languages.

...