Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. If the co-ordinator fails at step #1 after receiving a notification but not getting a chance to act on it, the new co-ordinator has to be able to detect the need for a rebalance operation on completing the failover. As part of failover, the co-ordinator reads a group's metadata from zookeeper, including the list of topics the group has subscribed to and the previous partition ownership decision. If the # of topics or # of partitions for the subscribed topics are different from the ones in the previous partition ownership decision, the new co-ordinator detects the need for a rebalance and initiates one for the group. Similarly if the consumers that connect to the new co-ordinator are different from the ones in the group's generation in zookeeper, it initiates a rebalance for the group.
  2. If the co-ordinator fails at step #2, it might send a HeartbeatResponse with the error code to some consumers but not all. Similar to failure #1 above, the co-ordinator will detect the need for rebalance after failover and initiate a rebalance again. If a rebalance was initiated due to a consumer failure and the consumer recovers before the co-ordinator failover completes, the co-ordinator will not initiate a rebalance. However, if any consumer sends it a JoinGroupRequest, it will initiate a rebalance for the entire group.
  3. If a co-ordinator fails at step #3, it might receive JoinGroupRequests from only a subset of consumers in the group. After failover, the co-ordinator will receive a HeartbeatRequest from all alive consumers. Similar to #1, it will trigger a rebalance for the group if it detects changes in topics/partitions or consumer membership.
  4. If a co-ordinator fails at step #4, it might fail after writing the new generation id and group membership in zookeeper. The generation id and membership information is written in one atomic zookeeper write operation. After failover, the consumer will send HeartbeatRequests to the new co-ordinator with an older generation id. The co-ordinator triggers a rebalance by returning an IllegalGeneration error code in the response that causes the consumer to send it a JoinGroupRequest.
  5. If a co-ordinator fails at step #5, it might send the JoinGroupResponse to only a subset of the consumers in a group. A consumer that received a JoinGroupResponse will detect the failed co-ordinator while sending a heartbeat or committing offsets. At this point, it will discover the new co-ordinator and send it a heartbeat with the new generation id. The co-ordinator will send it a HeartbeatResponse with no error code at this point. A consumer that did not receive a JoinGroupResponse will discover the new co-ordinator and send it a JoinGroupRequest. This will cause the co-ordinator to trigger a rebalance for the group.

Consumer id assignment

  1. After startup, a consumer learns it's consumer id in the very first JoinGroupResponse it receives from the co-ordinator. From that point onwards, the consumer is expected to include this consumer id in every request it sends to the co-ordinator (HeartbeatRequest, JoinGroupRequest, OffsetCommitRequest). If the co-ordinator receives a HeartbeatRequest or an OffsetCommitRequest with a consumer id that is different from the ones in the group, it sends an UnknownConsumer error code in the corresponding responses.
  2. The co-ordinator assigns a consumer id to a consumer on a successful rebalance and sends it in the JoinGroupResponse. The consumer should include this id in every subsequent JoinGroupRequest as well until it is shutdown or dies.
  3. The co-ordinator does consumer id assignment after it has received a JoinGroupRequest from all existing consumers in a group. At this point, it assigns a new id <group>-<consumer_host>-<sequence> to every consumer that did not send a consumer id in the JoinGroupRequest. The assumption is that such consumers are newly started up.
  4. If a consumer fails to send the same consumer id on subsequent JoinGroupRequests, it will cause a chain of rebalance attempts and can cause the group to never finish a rebalance operation successfully. This is because the way a co-ordinator knows that a rebalance operation should be triggered due to a new consumer, is by checking the consumer id in the JoinGroupRequest. If there is no consumer id, it assumes that a new consumer wants to join the group.
  5. If a consumer id is specified in the JoinGroupRequest but it does not match the ids in the current group membership, the co-ordinator sends an UnknownConsumer error code in the JoinGroupResponse and prevents the consumer from joining the group. This does not cause a rebalance operation but also does not allow such a consumer to join an existing group.