Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Rebalancing is the process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group is subscribed to. At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance within the group. The way rebalancing works is as follows. Every broker is elected as the coordinator for a subset of the consumer groups. The co-ordinator broker for a group is responsible for triggering rebalance attempts co-ordinating a rebalance operation on consumer group membership changes or subscribed topic partition changes. It is also responsible for communicating the resulting partition ownership configuration to all consumers of the group undergoing a rebalance operation.

...

  1. On startup or on co-ordinator failover, the consumer sends a ConsumerMetadataRequest to any of the brokers in the bootstrap.brokers list. In the ConsumerMetadataResponse, it receives the location of the co-ordinator for it's group and the co-ordinator's epoch.
  2. The consumer connects to the co-ordinator and sends a HeartbeatRequest. If an IllegalGeneration error code is returned in the HeartbeatResponse, it indicates that the co-ordinator has initiated a rebalance. The consumer then stops fetching data, commits offsets and sends a JoinGroupRequest to it's co-ordinator broker. In the JoinGroupResponse, it receives the list of topic partitions that it should own and the new generation id for it's group. At this time, group management is done and the consumer starts fetching data and (optionally) committing offsets for the list of partitions it owns.
  3. If no error is returned in the HeartbeatResponse, the consumer continues fetching data, for the list of partitions it last owned, without interruption.

...

  1. In steady state, the co-ordinator tracks the health of each consumer in every group through it's failure detection protocol.
  2. Upon election or startup, the co-ordinator reads the list of groups it manages and their membership information from zookeeper. If there is no previous group membership information, it does nothing until the first consumer in some group registers with it.
  3. Until the co-ordinator has finished finishes loading the group membership information for all groups that it is responsible for, it returns the CoordinatorStartupNotComplete error code in the responses of HearbeatRequests, OffsetCommitRequests and JoinGroupRequests. The consumer then retries the request after some backoff.
  4. Upon election or startup, the co-ordinator also starts failure detection for all consumers in a group. Consumers that are marked as dead by the co-ordinator's failure detection protocol are removed from the group and the co-ordinator triggers a rebalance operation for the consumer's group.
  5. Rebalance is triggered by killing the socket connection to the consumers in the group and returning the IllegalGeneration error code in the HeartbeatResponse. If the rebalance is caused due to a new consumer, the co-ordinator only breaks the socket connection to the rest of the consumers in the group. Consumers notice the broken socket connection and trigger a co-ordinator discovery process (#1, #2 in the Consumer section above). Once all alive consumers re-register with the co-ordinator via JoinGroupRequests, it communicates the new partition ownership to each of the consumers in the JoinGroupResponse, thereby completing the rebalance operation.
  6. The co-ordinator tracks the changes to topic partition changes for all topics that any consumer group has registered interest for. If it detects a new partition for any topic, it triggers a rebalance operation (as described in #5 above). It is currently not possible to reduce the number of partitions for a topic. The creation of new topics can also trigger a rebalance operation as consumers can register for topics before they are created.

...

The consumer specifies a session timeout in the JoinGroupRequest that it sends to the co-ordinator in order to join a consumer group. When the consumer has successfully joins a group, the failure detection process starts on the consumer as well as the co-ordinator. The consumer initiates periodic heartbeats (HeartbeatRequest), every session.timeout.ms/heartbeat.frequency to the co-ordinator and waits for a response. If the co-ordinator does not receive a HeartbeatRequest from a consumer for session.timeout.ms, it marks the consumer dead. Similarly, if the consumer does not receive the HeartbeatResponse within session.timeout.ms, it assumes the co-ordinator is dead and starts the co-ordinator rediscovery process. Here is the protocol in more detail -

...