THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
{ Version => int16 CorrelationId => int32 GroupId => String ConsumerHost => String SessionTimeout => int32 TopicsHeartbeatFrequency => int16 Topics => [String] DoesConsumerIdExist => int16[String] ConsumerId => String } |
JoinGroupResponse
Code Block |
---|
{ Version => int16 CorrelationId => int32 ErrorCode => int16 GroupId => String GroupGenerationId => int32 ConsumerId => String PartitionsToOwn => [TopicAndPartition] } |
HeartbeatRequest
Code Block |
---|
{ Version => int16 CorrelationId => int32 GroupId => String GroupGenerationId => int32 ConsumerId => String } |
...
- Consumer periodically sends a HeartbeatRequest to the coordinator every session.timeout.ms/hearbeat.frequency milliseconds. If the consumer receives the IllegalGeneration error code in the HeartbeatResponse, it stops fetching, commits offsets and sends a JoinGroupRequest to the co-ordinator. Until the consumer receives a JoinGroupResponse, it does not send any more HearbeatRequests to the co-ordinator.
- A higher heartbeat.frequency ensures lower latency on a rebalance operation since the co-ordinator notifies a consumer of the need to rebalance only on a HeartbeatResponse. To protect the broker from receiving too many heartbeats, the broker will have a heartbeat.frequency.threshold config that puts a lower bound on the frequency of heartbeats sent by consumers.
- The co-ordinator pauses failure detection for a consumer that has sent a JoinGroupRequest until a JoinGroupResponse is sent out. It restarts the hearbeat timer once the JoinGroupResponse is sent out and marks a consumer dead if it does not receive a HeartbeatRequest from that time for another session.timeout.ms milliseconds. This prevents the co-ordinator from marking a consumer dead during a rebalance operation. Note that this does not prevent the rebalance operation from finishing if a consumer goes through a soft failure during a rebalance operation. If the consumer pauses before it sends a JoinGroupRequest, the co-ordinator will mark it dead during the rebalance and send a JoinGroupResponse to the rest of the consumers with a RebalanceFailed error code. If a consumer pauses after it sends a JoinGroupRequest, the co-ordinator will send it the JoinGroupResponse assuming the rebalance completed successfully and will restart it's heartbeat timer. If the consumer resumes before session.timeout.ms, consumption starts normally. If the consumer pauses for session.timeout.ms after that, then it is marked dead by the co-ordinator and it will trigger a rebalance operation.
- The co-ordinator returns the new generation id only in the JoinGroupResponse. Once the consumer receives a JoinGroupResponse, it sends the next HeartbeatRequest with the new generation id.
...