Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. On startup or on co-ordinator failover, the consumer sends a ConsumerMetadataRequest to any of the brokers in the "bootstrap.brokers" list. In the ConsumerMetadataResponse, it receives the location of the co-ordinator for it's group .  and the co-ordinator's generation id.
  2. If the returned generation id of the co-ordinator is greater than the last known generation id, it indicates that the co-ordinator has initiated a rebalance. The consumer then stops fetching data, commits offsets and The consumer sends a JoinGroupRequest to it's co-ordinator broker. In the JoinGroupResponse, it receives the list of topic partitions that it should own.
  3. At this time, group management is done and the consumer starts fetching data and (optionally) committing offsets for the list of partitions it owns.

...

3) If coordinator has not heard from a consumer at least once in failed_heartbeat_threshold*(session_timeout/heartbeat_frequency), mark the consumer as dead, break the socket connection to the consumer and trigger a rebalance process for the group.

4) If consumer has not heart received a heartbeat from the coordinator after session timeout, it treats the coordinator as failed and triggers the co-ordinator re discovery process.

...

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerHost           => String
  SessionTimeout         => int64
  Topics                 => [String]
}

JoinGroupResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  GenerationId           => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [TopicAndPartition]
}

 

HeartbeatRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
}

...

Code Block
{
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
  ErrorCode              => int16
}
 

Wildcard Subscription

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

...