Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The co-ordinator for a group detects changes to the number of partitions for the subscribed list of topics.
  2. The co-ordinator then increments the group's generation id in-memory and closes the socket connection to all consumers in the group to trigger a rebalance operation.
  3. On losing connection to the co-ordinator, the consumers go through the co-ordinator re-discovery process. If the consumer receives a IllegalGeneration error code in either the ConsumerMetadataResponse or the HeartbeatResponse AND it is not waiting on a pending JoinGroupResponse, it stops fetching data, commits offsets and sends a JoinGroupRequest to the co-ordinator.
  4. The co-ordinator waits for all consumers to send it the JoinGroupRequest for the group. Once it receives all expected JoinGroupRequests, it increments the group's generation id in zookeeper, computes the new partition assignment and returns the updated assignment and the new generation id in the JoinGroupResponse
  5. On receiving the JoinGroupResponse, the consumer stores the new generation id locally and the consumer id locally and starts fetching data for the returned set of partitions. The subsequent requests sent by the consumer to the co-ordinator will use the new generation id returned by the last JoinGroupResponse.

...

  1. If a consumer receives a higher generation id for it's group, it stops fetching and commits existing offsets before sending a JoinGroupRequest to the co-ordinator.
  2. The co-ordinator checks the generation id in the OffsetCommitRequest and rejects it if the generation id in the request is higher than the generation id on the co-ordinator. This indicates a bug in the consumer code.
  3. The co-ordinator does not allow offset commit requests with generation ids older than the current group generation id in zookeeper either. This constraint is not a problem during a rebalance since until all consumers have sent a JoinGroupRequest, the co-ordinator does not increment the group's generation id. And from that point until the point the co-ordinator sends a JoinGroupResponse, it does not expect to receive any OffsetCommitRequests from any of the consumers in the group in the current generation. So the generation id on every OffsetCommitRequest sent by the consumer should always match the current generation id on the co-ordinator. Another case worth discussing is when a consumer goes through a soft failure e.g. a long GC pause during a rebalance. If a consumer pauses for more than session.timeout.ms, it is marked dead by the co-ordinator. It will expire the JoinGroupRequests received so far, fail the current rebalance attempt and trigger another rebalance operation.

Heartbeats during rebalance

  1. Consumer periodically sends a HeartbeatRequest to the coordinator every session_timeout / heartbeat_frequency milliseconds. If the consumer receives the IllegalGeneration error code in the HeartbeatResponse AND it is not waiting on a pending JoinGroupResponse, it stops fetching, commits offsets and sends a JoinGroupRequest to the co-ordinator.
  2. Until the consumer receives a JoinGroupResponse, it sends HearbeatRequests to the co-ordinator every session_timeout / heartbeat_frequency milliseconds, with the last known generation id. If there is a pending JoinGroupRequest from the consumer, the co-ordinator does not return the IllegalGeneration error code, but instead returns a RebalanceInProgress error code with the older generation id. This is done to prevent a consumer from sending JoinGroupRequests and essentially triggering rebalance

...

  1. operations indefinitely.
  2. The co-ordinator only returns the new generation id in the JoinGroupResponse. Until then every HeartbeatRequest and HeartbeatResponse contains the older generation id. This is done to prevent the co-ordinator from marking consumers dead during a long rebalance operation. If the consumer is single-threaded, a workaround for this issue is to increase the session.timeout.ms to be longer than the average rebalance operation latency. For paranoid users, session_timeout / heartbeat_frequency should be set larger than the average rebalance operation latency.
  3. A
  4.  Once the co-ordinator sends the JoinGroupResponse, all subsequent HearbeatRequests with a stale generation id will receive an IllegalGeneration error code in the HeartbeatResponse. This can happen since there will be a delay from the time the co-ordinator sends the JoinGroupResponse to when the consumer reads it and refreshes it's generation id. During this time, a multi-threaded consumer could've sent a HeartbeatRequest (with the older generation id). Note that this should not be interpreted as a request to rebalance since the consumer only stops fetching if it receives an IllegalGeneration error code when there is no pending JoinGroupResponse.