Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1) For each consumer group, the group metadata containing:

  • Its subscribed topic list.List of topics the group subscribes to
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

...

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ClientId               => String
  GroupId                => String
}

 

ConsumerMetadataResponse

Code Block
{
  CorrelationId          => int64
  ErrorCode              => int16
  CoordinatorId          => Broker
  GenerationId           => int64
}

...

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerAddrConsumerHost           => String
  SessionTimeout         => int64
  Topics Subscribing                => [String]
}

 

JoinGroupResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  GenerationGenerationId             => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [<TopicAndPartition>TopicAndPartition]
}
 

HeartbeatRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
}

...

Code Block
{
  CorrelationId          => int64
  ErrorCode              => int16
}
 

Wildcard Subscription Handling

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Rebalance Process: Option 2

For this approach, if a coordinator tries to rebalance, it needs to firstly increment the generation number, update the ZK accordingly, and then close the socket.

Also the consumer group generation number metadata needs to be able to propagated to all brokers from ZK, either through controller or through coordinator. We do not consider ZK directly push this information to brokers upon updates or a pull-based method since neither are scalable.

Then the rebalance process becomes:

1) Upon coordinator failure, fetch the new coordinator id along with the generation number.

2) If the generation number is not bigger than its currently held generation number, send ping request to the coordinator upon connection setup.

3) Otherwise, prepare the rebalance by stop fetching and call onPartitionDeassigned, and then send a join request to the coordinator upon connection setup.

4) Coordinator accepts the offset commit even if the generation number is smaller if it is undergoing rebalance for the group (will talk about offset management in the next section).

 

Consumer's logic can then be summarized as the following state-machine:

-----------------------------------------------------------------------------------------------------------------

Joined (join response received, started fetching and pinging):

  1) Topic Subscription Change  => Joining (prepare for rebalance, send join request)

  2) Coordinator Failed => Discovering (send find_coordinator)

  3) Shutdown Signal => Stopped

Discovering (lost connection with coordinator, but may be fetching data)

  1) Found Coordinator, Generation Number Changed => Joining

  2) Found Coordinator, Generation Number OK => Joined

  3) Shutdown Signal => Stopped

Joining (not fetching, waiting for join response)

  1) Error Join Response => Discovering

  2) Normal Join Response => Joined

  3) Shutdown Signal => Stopped

Stopped (not started, killed, shutdown)

  1) Startup => Discovering

-----------------------------------------------------------------------------------------------------------------