Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The consumer specifies a session timeout, a heartbeat frequency and failed heartbeat threshold in the RegisterConsumerRequest that it sends to the co-ordinator in order to join a consumer group. The consumer initiates periodic heartbeats (HeartbeatRequest) to the co-ordinator and waits for a response. The heartbeat frequency determines the number of times a consumer sends a heartbeat to the co-ordinator within a session timeout window. The failure threshold determines the number of failed or missed heartbeats allowed to the consumer before it marks the consumer dead. For example, for a session timeout of 1 second and a heartbeat frequency of 3, the consumer is expected to send a HeartbeatRequest to the co-ordinator every (1000/3=333.33 milliseconds). With a failed heartbeat threshold of 3, it means the co-ordinator will mark the consumer dead, if it hasn't received at least one HeartbeatRequest every (1000/3)*3 i.e 1 second. When the consumer has successfully joins a group, the failure detection process starts on the consumer as well as the co-ordinator. Here is the protocol in more detail -

1) Consumer periodically sends a HeartbeatRequest to the coordinator every session_timeout / heartbeat_frequency milliseconds 0.3*session.timeout.ms milliseconds.

2) Upon receiving the HeartbeatRequest, coordinator checks the generation number, the consumer id and the consumer group. If the consumer specifies an invalid or stale generation id (i.e., consumer id not assigned before, or generation number is smaller than current value), it send an IllegalGeneration error code in the HeartbeatResponse.

...

4) If consumer has not received a heartbeat from the coordinator after session timeoutafter session.timeout.ms, it treats the coordinator as failed and triggers the co-ordinator re-discovery process.

...

  1. Consumer periodically sends a HeartbeatRequest to the coordinator every session_timeout / heartbeat_frequency milliseconds 0.3*session.timeout.ms milliseconds. If the consumer receives the IllegalGeneration error code in the HeartbeatResponse AND it is not waiting on a pending JoinGroupResponse, it stops fetching, commits offsets and sends a JoinGroupRequest to the co-ordinator.
  2. Until the consumer receives a JoinGroupResponse, it sends HearbeatRequests to the co-ordinator every 0.3*session_timeout / heartbeat_frequency milliseconds.timeout.ms milliseconds, with the last known generation id. If there is a pending JoinGroupRequest from the consumer, the co-ordinator does not return the IllegalGeneration error code, but instead returns a RebalanceInProgress error code with the older generation id. This is done to prevent a consumer from sending JoinGroupRequests and essentially triggering rebalance operations indefinitely.
  3. The co-ordinator only returns the new generation id in the JoinGroupResponse. Until then every HeartbeatRequest and HeartbeatResponse contains the older generation id. This is done to prevent the co-ordinator from marking consumers dead during a long rebalance operation. If the consumer is single-threaded, a workaround for this issue is to increase the session.timeout.ms to be longer than the average rebalance operation latency. For paranoid users, 0.3*session_timeout / heartbeat_frequency.timeout.ms should be set larger than the average rebalance operation latency.
  4. Once the co-ordinator sends the JoinGroupResponse, all subsequent HearbeatRequests with a stale generation id will receive an IllegalGeneration error code in the HeartbeatResponse. This can happen since there will be a delay from the time the co-ordinator sends the JoinGroupResponse to when the consumer reads it and refreshes it's generation id. During this time, a multi-threaded consumer could've sent a HeartbeatRequest (with the older generation id). Note that this should not be interpreted as a request to rebalance since the consumer only stops fetching if it receives an IllegalGeneration error code when there is no pending JoinGroupResponse.

...