Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that on coordinator failover, the consumers may discover the new coordinator before or after the new coordinator has finished the failover process including loading the consumer group metadata from ZK, etc. In the latter case, the new coordinator will just accept its ping request as normal; in the former case, the new coordinator may reject its request, causing it to re-dicover coordinators and re-connect again, which is fine. Also, if the consumer connects to the new coordinator too late, the co-ordinator may have marked the consumer has dead and will be treat the consumer as a new consumer, which is also fine.

Request formats

For each consumer group, the coordinator stores the following information:

...

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

ConsumerMetadataRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ClientId               => String
  GroupId                => String
}

ConsumerMetadataResponse

Code Block
{
  CorrelationId          => int64
  ErrorCode              => int16
  CoordinatorId          => Broker
  GenerationId           => int64
}

JoinGroupRequest

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerHost           => String
  SessionTimeout         => int64
  Topics                 => [String]
}

JoinGroupResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  GenerationId           => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [TopicAndPartition]
}

 

HeartbeatRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
}

HeartbeatResponse

Code Block
{
  CorrelationId          => int64
  ErrorCode              => int16
}

...