The motivation of moving to a new set of consumer client APIs with broker side co-ordination is laid out here.
The proposed consumer APIs are here. Several API usage examples are documented here.
Group management protocol
Rebalancing is the process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group has subscribed to. At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance. The way rebalancing works is as follows. One of the brokers is elected as the coordinator for a subset of the consumer groups. It will be responsible for triggering rebalancing attempts for certain consumer groups on consumer group membership changes or subscribed topic partition changes. It will also be responsible for communicating the resulting partition-consumer ownership configuration to all consumers of the group undergoing a rebalance operation.
The consumer specifies a session timeout, a heartbeat frequency and failed heartbeat threshold in the RegisterConsumerRequest that it sends to the co-ordinator in order to join a consumer group. The consumer initiates periodic heartbeats (HeartbeatRequest) to the co-ordinator and waits for a response. The heartbeat frequency determines the number of times a consumer sends a heartbeat to the co-ordinator within a session timeout window. The failure threshold determines the number of failed or missed heartbeats allowed to the consumer before it marks the consumer dead. For example, for a session timeout of 1 second and a heartbeat frequency of 3, the consumer is expected to send a HeartbeatRequest to the co-ordinator every (1000/3=333.33 milliseconds). With a failed heartbeat threshold of 3, it means the co-ordinator will mark the consumer dead, if it hasn't received at least one HeartbeatRequest every (1000/3)*3 i.e 1 second.
When the consumer has successfully joins a group, the failure detection process starts on the consumer as well as the co-ordinator. Here is the protocol in more detail -
1) Consumer periodically sends a HeartbeatRequest to the coordinator every session_timeout / heartbeat_frequency milliseconds.
2) Upon receiving the HeartbeatRequest, coordinator checks the generation number, the consumer id and the consumer group. If the consumer specifies an invalid or stale generation id (i.e., consumer id not assigned before, or generation number is smaller than current value), it send an error code in the HeartbeatResponse.
3) If coordinator has not heard from a consumer at least once in failed_heartbeat_threshold*(session_timeout/heartbeat_frequency), mark the consumer as dead, break the socket connection to the consumer and trigger a rebalance process for the group.
4) If consumer has not received a heartbeat from the coordinator after session timeout, it treats the coordinator as failed and triggers the co-ordinator re discovery process.
5) If consumer finds the socket channel to the coordinator to be closed, it treats the coordinator as failed and triggers the co-ordinator re discovery process..
Note that on coordinator failover, the consumers may discover the new coordinator before or after the new coordinator has finished the failover process including loading the consumer group metadata from ZK, etc. In the latter case, the new coordinator will just accept its ping request as normal; in the former case, the new coordinator may reject its request, causing it to re-dicover coordinators and re-connect again, which is fine. Also, if the consumer connects to the new coordinator too late, the co-ordinator may have marked the consumer has dead and will be treat the consumer as a new consumer, which is also fine.
For each consumer group, the coordinator stores the following information:
1) For each consumer group, the group metadata containing:
2) For each existing topic, a list of consumer groups that are currently subscribing to it.
{ Version => int16 CorrelationId => int32 ClientId => String GroupId => String } |
{ CorrelationId => int32 ErrorCode => int16 CoordinatorId => Broker CoordinatorEpoch => int32 GroupGenerationId => int32 } |
{ Version => int16 CorrelationId => int32 GroupId => String ConsumerHost => String SessionTimeout => int32 Topics => [String] } |
{ Version => int16 CorrelationId => int32 ErrorCode => int16 GroupGenerationId => int32 GroupId => String ConsumerId => String PartitionsToOwn => [TopicAndPartition] } |
{ Version => int16 CorrelationId => int32 GroupId => String GroupGenerationId => int32 ConsumerId => String } |
{ CorrelationId => int32 GroupId => String GroupGenerationId => int32 ConsumerId => String ErrorCode => int16 } |
With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.