Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Consumer periodically sends a HeartbeatRequest to the coordinator every 0.3*session.timeout.ms milliseconds. If the consumer receives the IllegalGeneration error code in the HeartbeatResponse AND it is not waiting on a pending JoinGroupResponse, it stops fetching, commits offsets and sends a JoinGroupRequest to the co-ordinator.
    Until the consumer receives a JoinGroupResponse, it sends does not send any more HearbeatRequests to the co-ordinator.
     
  2. The co-ordinator every 0.3*pauses failure detection for a consumer that has sent a JoinGroupRequest until a JoinGroupResponse is sent out. It restarts the hearbeat timer once the JoinGroupResponse is sent out and marks a consumer dead if it does not receive a HeartbeatRequest from that time for another session.timeout.ms milliseconds, with the last known generation id. If there is a pending JoinGroupRequest from the consumer, the co-ordinator does not return the IllegalGeneration error code, but instead returns a RebalanceInProgress error code with the older generation id. This is done to prevent a consumer from sending JoinGroupRequests and essentially triggering rebalance operations indefinitely.
  3. The co-ordinator only returns the new generation id in the JoinGroupResponse. Until then every HeartbeatRequest and HeartbeatResponse contains the older generation id. This is done to prevent the co-ordinator from marking consumers dead during a long rebalance operation. If the consumer is single-threaded, a workaround for this issue is to increase the session.timeout.ms to be longer than the average rebalance operation latency. For paranoid users, 0.3*session.timeout.ms should be set larger than the average rebalance operation latency.
  4. . This prevents the co-ordinator from marking a consumer dead during a rebalance operation. Note that this does not prevent the rebalance operation from finishing if a consumer goes through a soft failure during a rebalance operation. If the consumer pauses before it sends a JoinGroupRequest, the co-ordinator will mark it dead during the rebalance and send a JoinGroupResponse to the rest of the consumers with a RebalanceFailed error code. If a consumer pauses after it sends a JoinGroupRequest, the co-ordinator will send it the JoinGroupResponse assuming the rebalance completed successfully and will restart it's heartbeat timer. If the consumer resumes before session.timeout.ms, consumption starts normally. If the consumer pauses for session.timeout.ms after that, then it is marked dead by the co-ordinator and it will trigger a rebalance operation.
  5. The co-ordinator returns the new generation id only in the JoinGroupResponse. Once the consumer receives a JoinGroupResponse, it sends the next HeartbeatRequest with the new generation idOnce the co-ordinator sends the JoinGroupResponse, all subsequent HearbeatRequests with a stale generation id will receive an IllegalGeneration error code in the HeartbeatResponse. This can happen since there will be a delay from the time the co-ordinator sends the JoinGroupResponse to when the consumer reads it and refreshes it's generation id. During this time, a multi-threaded consumer could've sent a HeartbeatRequest (with the older generation id). Note that this should not be interpreted as a request to rebalance since the consumer only stops fetching if it receives an IllegalGeneration error code when there is no pending JoinGroupResponse.