Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In addition, for each consumer group the coordinator also maintains the group config and partition ownership metadata in ZK as a json object. This node will be updated upon rebalance completion and read upon coordinator failover (details below).

 

Consumer Startup (Subscription Change)

...

-----------------------------------------------------------------------------------

 

FindCoordinatorRequest

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ClientId               => String
  GroupId                => String
}

 

FindCoordinatorResponse

...

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  CoordinatorId          => int16
}

...

---------------------------------------------------------------------------------------

 

JoinGroupRequest

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerAddr           => String
  SessionTimeout         => int64
  Subscribing            => [String]
}

 

JoinGroupResponse

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  Generation             => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [<TopicAndPartition>]
}

 

...

The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance.

 

Consumer

...

The consumer config and ownership metadata can be stored in one ZK path as a json object. This node will be updated upon rebalance completion and read upon coordinator failover.

Consumer Fetching Data

As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.

...

------------------------------------------------------------------------------------------------

 

PingRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
}

 

PingResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
}

...