Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • List of topics the group subscribes to
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

...

Code Block
{
  CorrelationId          => int32
  ErrorCode              => int16
  CoordinatorId          => Broker
  CoordinatorEpoch       => int32
  GroupGenerationId      => int32
}

JoinGroupRequest

...

Code Block
{
  CorrelationId          => int32
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
  ErrorCode              => int16
}

...

  1. The co-ordinator for a group detects changes to the number of partitions for the subscribed list of topics.
  2. The co-ordinator then increments the group's generation id in zookeeper.
  3. The controller watches the generation id changes for every consumer group and sends a UpdateConsumerMetadataRequest to all brokers with the new generation id for the affected group.
  4. The co-ordinator also -memory and closes the socket connection to all consumers in the group to trigger a rebalance operation.
  5. On losing connection to the co-ordinator, the consumers go through the co-ordinator re-discovery process. If the consumer receives a higher generation id IllegalGeneration error code in either the ConsumerMetadataResponse or the HeartbeatResponse, it stops fetching data, commits offsets and sends a JoinGroupRequest to the co-ordinator. 
  6. The co-ordinator waits for all consumers to send it the JoinGroupRequest for the group. Once it receives all expected JoinGroupRequests, it increments the group's generation id in zookeeper, computes the new partition assignment and returns the updated assignment and the new generation id in the JoinGroupResponse
  7. On receiving the JoinGroupResponse, the consumer stores the new generation id locally and the consumer id locally and starts fetching data for the returned set of partitions. The subsequent requests sent by the consumer to the co-ordinator will use the new generation id returned by the last JoinGroupResponse.

Offset commits during rebalance

  1. If a consumer receives a higher generation id for it's group, it stops fetching and commits existing offsets before sending a JoinGroupRequest to the co-ordinator.
  2. The co-ordinator checks the generation id in the OffsetCommitRequest and rejects it if the generation id in the request is higher than the generation id on the co-ordinator. This either indicates a bug in the consumer logic or a consumer with stale consumer metadata sending requests to the older co-ordinatorcode.
  3. The co-ordinator does not allow offset commit requests with generation ids older than the current group generation id in zookeeper either. This is done for multiple reasons. During rebalanceconstraint is not a problem during a rebalance since until all consumers have sent a JoinGroupRequest, the co-ordinator would've incremented the generation id for the group to trigger a rebalance while the consumers would commit offsets with the previous generation id. Another way this can happen is if a consumer goes through a long GC pause while the consumer's group has gone through one or more rebalance operations. In this case, the consumer would still commit offsets using an older generation id until it does not increment the group's generation id. And from that point until the point the co-ordinator sends a JoinGroupResponse, it does not expect to receive any OffsetCommitRequests from any of the consumers in the group in the current generation. So the generation id on every OffsetCommitRequest sent by the consumer should always match the current generation id on the co-ordinator. Another case worth discussing is when a consumer goes through a soft failure e.g. a long GC pause during a rebalance. If a consumer pauses for more than session.timeout.ms, it is marked dead by the co-ordinator. It will expire the JoinGroupRequests received so far, fail the current rebalance attempt and trigger another rebalance operation.