Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that on coordinator failover, the consumers may discover the new coordinator before or after the new coordinator has finished the failover process including loading the consumer group metadata from ZK, etc. In the latter case, the new coordinator will just accept its ping request as normal; in the former case, the new coordinator may reject its request, causing it to re-dicover the co-ordinator and re-connect again, which is fine. Also, if the consumer connects to the new coordinator too late, the co-ordinator may have marked the consumer dead and will be treat the consumer as a new consumer, which is also fine.

State diagram

Consumer

Here is a description of the states -

Down - The consumer process is down

Startup up & discover co-ordinator - In this state, the consumer discovers the co-ordinator for it's group

Part of a group - In this state, the consumer is part of a group if it receives a JoinGroupResponse with no error code, a consumer id and the generation id for it's group.

Re-discover co-ord - In this state, the consumer does not stop consumption but tries to re-discover the co-ordinator. 

Stopped consumption - In this state, the consumer stops consumption and commits offsets, until it joins the group again

Image Added

Co-ordinator

Following is a state diagram that describes the state changes on the co-ordinator for a particular group.

Here is a description of the states -

Down - The co-ordinator is dead or demoted

Catch up - In this state, the co-ordinator is elected but not ready to serve requests

Ready - In this state, the newly elected co-ordinator has finished loading the group metadata for all groups that it is responsible for

Prepare for rebalance - In this state, the co-ordinator sends the IllegalGeneration error in the HeartbeatResponse for all consumers in the group and waits for the consumers to send it a JoinGroupRequest

Rebalancing - In this state, the co-ordinator has received a JoinGroupRequest from the consumers in the current generation and it increments the group generation id, assigns consumer ids where required and does the partition assignment

Steady - In the steady state, the co-ordinator accepts OffsetCommitRequests and heartbeats from all consumers in every group 

Image Added


 Consumer id assignment

  1. After startup, a consumer learns it's consumer id in the very first JoinGroupResponse it receives from the co-ordinator. From that point onwards, the consumer is expected to include this consumer id in every request it sends to the co-ordinator (HeartbeatRequest, JoinGroupRequest, OffsetCommitRequest). If the co-ordinator receives a HeartbeatRequest or an OffsetCommitRequest with a consumer id that is different from the ones in the group, it sends an UnknownConsumer error code in the corresponding responses.
  2. The co-ordinator assigns a consumer id to a consumer on a successful rebalance and sends it in the JoinGroupResponse. The consumer should include this id in every subsequent JoinGroupRequest as well until it is shutdown or dies.
  3. The co-ordinator does consumer id assignment after it has received a JoinGroupRequest from all existing consumers in a group. At this point, it assigns a new id <group>-<consumer_host>-<sequence> to every consumer that did not have a consumer id in the JoinGroupRequest. The assumption is that such consumers are newly started up.
  4. If a consumer fails to send the same consumer id on subsequent JoinGroupRequests, it will cause a chain of rebalance attempts and can cause the group to never finish a rebalance operation successfully. This is because the way a co-ordinator knows that a rebalance operation should be triggered due to a new consumer, is by checking the consumer id in the JoinGroupRequest. If there is no consumer id, it assumes that a new consumer wants to join the group.
  5. If a consumer id is specified in the JoinGroupRequest but it does not match the ids in the current group membership, the co-ordinator sends an UnknownConsumer error code in the JoinGroupResponse and prevents the consumer from joining the group. This does not cause a rebalance operation for the rest of the consumers in the group, but also does not allow such a consumer to join an existing group.

...

 

Code Block
{
  Version                => int16
  CorrelationId          => int32
  GroupId                => String
  ConsumerHost           => String
  SessionTimeout         => int32
  HeartbeatFrequency     => int16
  Topics                 => [String]
  ConsumerId             => String 
 }

JoinGroupResponse

Code Block
{
  Version                => int16
  CorrelationId          => int32
  ErrorCode              => int16
  GroupId                => String 
  GroupGenerationId      => int32
  ConsumerId             => String
  PartitionsToOwn        => [TopicAndPartition]
}

 

HeartbeatRequest

Code Block
{
  Version                => int16
  CorrelationId          => int32
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
}

...