Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerAddr           => String
  SessionTimeout         => int64
  Subscribing            => [String]
}

 

JoinGroupResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  Generation             => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [<TopicAndPartition>]
}

 

 

 

 

Note that the consumer id is assigned by the coordinator upon joining the group, which is then used in heartbeat protocol and offset management.

...

This can lift the lower bound of the rebalance latency on ping frequency, but will require more one metadata request along with unnecessary rebalances on coordinator failover (cause they are now treated as one scenario).

 

Offset Management

Commit Offset

Upon receiving the commit offset request, the coordinator checks if the consumer is 1) within the group specified, 2) owns the partitions it commit offsets to, and 3) has the correct generation number. If yes append the offset entry to the corresponding log, otherwise rejects the request.

Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.

...

OffsetCommitRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String 
  ConsumerId             => String
  Generation             => int16
  Offsets                => [TopicAndPartition -> OffsetAndError]
}

OffsetCommitResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  responses              => [TopicAndPartition -> int16]  // error code for each partition
}

 

Fetch Offset

Fetch offset request is also handled by the coordinator.

Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.

Upon receiving a reject response from the coordinator, the consumer will try to connect to the new coordinator (i.e., set up the connection, send heartbeat and get valid response) and then retry.

 

OffsetFetchRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String 
  ConsumerId             => String
  Generation             => int16
  partitions             => [TopicAndPartition]
}

 

OffsetFetchResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  offsets                => [TopicAndPartition -> OffsetAndError] 
}

 

Consumer Architecture

API Implementation

...