Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Note that pre-0.9, we support both of these use cases by providing a simple consumer and a zookeeper consumer (i.e. high-level consumer) interface, and users need to choose all-or-none for the functionalities; in 0.9 we will combine these two into one interface with more flexibility.

 

Consumer API

Here is the proposed consumer API interface:

Motivation

The motivation of moving to a new set of consumer client APIs with broker side co-ordination is laid out here. Some back of the envelope calculations along with experience of building a prototype for this suggest that with a single coordinator we should support up to 5K consumers (the bottleneck should be the number of socket channels a single machine can have). Every broker will be the co-ordinator for a subset of consumer groups, thereby, providing the ability to scale the number of consumer groups linearly with the size of the cluster.

Consumer API

Here is the proposed consumer API interface:

Code Block
Consumer (Properties) {

  // Incrementally subscribe to a list of topics.
  
Code Block
Consumer (Properties) {

  // Incrementally subscribe to a list of topics.
  // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions.
  void subscribe(String...) throws SubscriptionNotValidException;

  // Incrementally subscribe to a specified topic partition.
  // No group management will be invoked since assigned partitions have been specified.
  // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions.
   void subscribe(String, Int)...) throws SubscriptionNotValidException;

  // TryIncrementally subscribe to fetcha messagesspecified from the topic partition.
  // No group management will be invoked since assigned partitions have been specified.
  // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions.
  void subscribe(String, Int);

  // Try to fetch messages from the topic/partitions it subscribed to. 
  // Return whatever messages are available to be fetched or empty after specified timeout.
  // @thorws InitOffsetNotKnownException if the starting offset for the assigned partitions is unknown.
  List<Record> poll(Long) throws InitOffsetNotKnownException;

  // Commit latest offsets for all partitions currently consuming from.
  // If the sync flag is set to true, commit call will be sync and blocking.
  // Otherwise the commit call will be async and best-effort.
  void commit(Boolean); 

  // Commit the specified offset for the specified partitions.
  // If the sync flag is set to true, commit call will be sync and blocking.
  // Otherwise the commit call will be async and best-effort.
  // @thorws InvalidPartitionsToCommitOffsetException if the group management has invoked, sync flag is true and the specified partitions do not belong to assigned ones.
  void commit(List[(String, Int, Long)], Boolean) throws InvalidPartitionsToCommitOffsetException;

  // Specify the fetch starting offset for the specified topic/partition.
  void position(List[(String, Int, Long)])

  // Get the last committed offsets of the partitions currently consuming.
  // Block wait if the current partitions are not known yet.
  Map[(String, Int), Long] getOffsets() OffsetUnknownException;

  // Get the last committed offsets of the specified topic/partition.
  // @throws OffsetUnknownException if not offsets of the specified partitions have ever been committed.
  Long getOffset(String, Int) throws OffsetUnknownException;
  
  // --------- Call-back Below, not part of API ------------ //

  // Call-back function upon partition de-assignment.
  // Default implementation will commit offset depending on auto.commit config.
  void onPartitionDeassigned(List[(String, Int)]);

  // Call-back function upon partition re-assignment.
  // Default implementation will fetch starting offset depending on auto.commit config.
  void onPartitionAssigned(List[(String, Int)])
}

...

Code Block
void onPartitionDeassigned(partitions) {
  offsets = getOffsets();
  // store offsets in a remote DB
}

void onPartitionAssigned(partitions) {
  // read offsets from the remote DB
  position([partitions, offsets]);
}

Consumer Coordination Protocol Overview

Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers. We follow the in-built offset management design described here to assign coordinators based on the "offset" topic partition leadership. That is, an "offset" topic will be used to store the offsets sent from each consumer group, and the partition id is based on the consumer group names. The leader of the partition that a consumer group's offsets commit to will be the coordinator of that group.

Since all the brokers cache the leadership for all the topic partitions, all brokers will know the coordinator for any given consumer groups. Upon receiving any requests from a consumer, the coordinator will only serve it if it is the assigned coordinator of the specified group this consumer belongs to; otherwise the request will be rejected.

 

For each consumer group, the coordinator stores the following information:

1) For each consumer group it currently serve, the group metadata containing:

  • Its subscribed topic list.
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

 

In addition, for each consumer group the coordinator also maintains the group config and partition ownership metadata in ZK as a json object. This node will be updated upon rebalance completion and read upon coordinator failover (details below).

 

Consumer Startup (Subscription Change)

On consumer startup, it will usually first try to subscribe to a list of topics.

If it does not subscribe to any topics, then poll() will always return empty set; if it has subscribed to specific topic/partitions, then it does not need to communicate with the coordinator but just start fetching data (will talk about how to start fetching data below).

Only if it has changed its topic subscription (including from an empty set on startup), it will need to coordinate with the coordinators. If consumer has not connected to the coordinator, it will first try to find the current coordinator and setup the connection.

Another note is that in this proposal we do not require consumer's client id in the config but leave it as an optional config, when it is not specified a default id of will be used, and if group management is invoked the coordinator assign consumer ids upon them successfully joining the group.

Coordinator Discovery

The coordinator discovery is through the coordinator request, which can be answered by any alive brokers and whose response will contain the coordinator id based on consumer group.

 

-----------------------------------------------------------------------------------

Any Broker (alive)                                                <-- (find_coordinator) -- Consumer 1

                                                                            -- (response) -->

                                                                            <-- (find_coordinator) -- Consumer 2

                                                                            -- (response) -->

-----------------------------------------------------------------------------------

 

FindCoordinatorRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ClientId               => String
  GroupId                => String
}

 

FindCoordinatorResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  CoordinatorId          => int16
}

Consumer (re)Join-Group

Upon successfully connects to the coordinator, the consumer will either try to (re-)join the group if 1) it has not own any partitions, or 2) its subscription has changed, or 3) its heartbeat responses has received an error. Otherwise it will just keep heartbeating with the coordinator (we will talk about this later). The consumer will try to join the group by doing the following:

1) Drop any fetched data, stop sending any fetch requests to brokers.

2) Release any assigned partition ownership by calling onPartitionDeassigned function.

3) Send a join group request to the coordinator and block waiting on the response.

 

Upon receiving a join group request, the coordinator will trigger a rebalance on the group this consumer belongs to.

When the rebalance finishes, it will send the response back to the consumer with the newly assigned partitions, the new consumer id along with the generation number back to the consumer.

------------------------------------------------------------------------------------

Coordinator                                                                             <-- (join) --     Consumer 1 (alive)

                                                                                              

(received a join request on consumer 1, rebalance)

...

(upon rebalance finish)

                                                                                                -- (response) -->

---------------------------------------------------------------------------------------

 

JoinGroupRequest

 

Group management protocol

Rebalancing is the process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group has subscribed to. At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance. The way rebalancing works is as follows. One of the brokers is elected as the coordinator for a subset of the consumer groups. It will be responsible for triggering rebalancing attempts for certain consumer groups on consumer group membership changes or subscribed topic partition changes. It will also be responsible for communicating the resulting partition-consumer ownership configuration to all consumers of the group undergoing a rebalance operation.

Consumer

  1. On startup or on co-ordinator failover, the consumer sends a ClusterMetadataRequest to any of the brokers in the "bootstrap.brokers" list. In the ClusterMetadataResponse, it receives the location of the co-ordinator for it's group.  
  2. The consumer sends a RegisterConsumer request to it's co-ordinator broker. In the RegisterConsumerResponse, it receives the list of topic partitions that it should own.
  3. At this time, group management is done and the consumer starts fetching data and (optionally) committing offsets for the list of partitions it owns.

Co-ordinator

  1. Upon election or startup, the co-ordinator reads the list of groups it manages and their membership information from zookeeper. If there is no previous group membership information, it does nothing until the first consumer in some group registers with it.
  2. Upon election, if the co-ordinator finds previous group membership information in zookeeper, it waits for the consumers in each of the groups to re-register with it. Once all known and alive consumers register with the co-ordinator, it marks the rebalance completed. 
  3. Upon election or startup, the co-ordinator also starts failure detection for all consumers in a group. Consumers that are marked as dead by the co-ordinator's failure detection protocol are removed from the group and the co-ordinator marks the rebalance for a group completed by communicating the new partition ownership to the remaining consumers in the group.
  4. In steady state, the co-ordinator tracks the health of each consumer in every group through it's failure detection protocol.
  5. If the co-ordinator marks a consumer as dead, it triggers a rebalance operation for the remaining consumers in the group. Rebalance is triggered by killing the socket connection to the remaining consumers in the group. Consumers notice the broken socket connection and trigger a co-ordinator discovery process (#1, #2 above). Once all alive consumers re-register with the co-ordinator, it communicates the new partition ownership to each of the consumers in the RegisterConsumerResponse, thereby completing the rebalance operation.
  6. The co-ordinator tracks the changes to topic partition changes for all topics that any consumer group has registered interest for. If it detects a new partition for any topic, it triggers a rebalance operation (as described in #5 above). It is currently not possible to reduce the number of partitions for a topic. The creation of new topics can also trigger a rebalance operation as consumers can register for topics before they are created.

Failure detection protocol

The consumer specifies a session timeout in the RegisterConsumerRequest that it sends to the co-ordinator in order to join a consumer group. The consumer initiates periodic heartbeats (HeartbeatRequest) to the co-ordinator and waits for a response at a configurable heartbeat frequency. The heartbeat frequency indicates the number of times

When the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumer).

1) Consumers periodically send heartbeat request to the coordinator based on every session_timeout / heartbeat_frequency (which is an integer > 1).

2) Upon receiving the heartbeat request, coordinator checks the generation number and the consumer id, if they are not validate (i.e., consumer id not assigned before, or generation number is smaller than current value), send an error response; otherwise send a normal response.

3) If coordinator has not heard from a consumer after session_timeout, treat the consumer as failed.

4) If consumer has not heart from the coordinator after session timeout, treat the coordinator as failed.

5) If consumer found the channel with the coordinator has closed, treat the coordinator as failed.

Note that on coordinator failover, the consumers may discover the new coordinator before or after the new coordinator has finsihed the failover process including loading the consumer group metadata from ZK, etc. In the latter case, the new coordinator will just accept its ping request as normal; in the former case, the new coordinator may reject its request, causing it to re-dicover coordinators and re-connect again, which is fine. Also, if the consumer connects to the new coordinator too later, it may already be expired by the new coordinator and will hence be treated as a new consumer, which is also fine.

Consumer Coordination Protocol Overview

Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers. We follow the in-built offset management design described here to assign coordinators based on the "offset" topic partition leadership. That is, an "offset" topic will be used to store the offsets sent from each consumer group, and the partition id is based on the consumer group names. The leader of the partition that a consumer group's offsets commit to will be the coordinator of that group.

Since all the brokers cache the leadership for all the topic partitions, all brokers will know the coordinator for any given consumer groups. Upon receiving any requests from a consumer, the coordinator will only serve it if it is the assigned coordinator of the specified group this consumer belongs to; otherwise the request will be rejected.

 

For each consumer group, the coordinator stores the following information:

1) For each consumer group it currently serve, the group metadata containing:

  • Its subscribed topic list.
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

 

In addition, for each consumer group the coordinator also maintains the group config and partition ownership metadata in ZK as a json object. This node will be updated upon rebalance completion and read upon coordinator failover (details below).

 

Consumer Startup (Subscription Change)

On consumer startup, it will usually first try to subscribe to a list of topics.

If it does not subscribe to any topics, then poll() will always return empty set; if it has subscribed to specific topic/partitions, then it does not need to communicate with the coordinator but just start fetching data (will talk about how to start fetching data below).

Only if it has changed its topic subscription (including from an empty set on startup), it will need to coordinate with the coordinators. If consumer has not connected to the coordinator, it will first try to find the current coordinator and setup the connection.

Another note is that in this proposal we do not require consumer's client id in the config but leave it as an optional config, when it is not specified a default id of will be used, and if group management is invoked the coordinator assign consumer ids upon them successfully joining the group.

FindCoordinatorRequest

Code Block
{
  

...

Version                => int16
  CorrelationId          => int64
  

...

ClientId               => String
  

...

GroupId    

...

 

...

  

...

         =>

...

 

...

String

...


}

 

JoinGroupResponse

FindCoordinatorResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  

...

CoordinatorId          

...

=> int16

...

 

}

Consumer (re)Join-Group

Upon successfully connects to the coordinator, the consumer will either try to (re-)join the group if 1) it has not own any partitions, or 2) its subscription has changed, or 3) its heartbeat responses has received an error. Otherwise it will just keep heartbeating with the coordinator (we will talk about this later). The consumer will try to join the group by doing the following:

1) Drop any fetched data, stop sending any fetch requests to brokers.

2) Release any assigned partition ownership by calling onPartitionDeassigned function.

3) Send a join group request to the coordinator and block waiting on the response.

 

Upon receiving a join group request, the coordinator will trigger a rebalance on the group this consumer belongs to.

When the rebalance finishes, it will send the response back to the consumer with the newly assigned partitions, the new consumer id along with the generation number back to the consumer.

------------------------------------------------------------------------------------

Coordinator                                                                             <-- (join) --     Consumer 1 (alive)

                                                                                              

(received a join request on consumer 1, rebalance)

...

(upon rebalance finish)

                                                                                                -- (response) -->

---------------------------------------------------------------------------------------

 

JoinGroupRequest

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerAddr           => String
  SessionTimeout         => int64
  Subscribing            => [String]
}

 

JoinGroupResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  Generation             => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [<TopicAndPartition>]
}

 

Note that the assigned consumer client id is assigned by the coordinator will also be used in heartbeat protocol and offset management talked below.

Coordinator Startup (Coordinator Failover)

Coordinators will be constructed upon broker startup, and initialized with empty consumer groups. There are two cases a new consumer group can be created on the coordinator:

1) When a first request is received from one of the consumers in this group, the coordinator need to create this group after validating by the group name that it is the assigned coordinator.

2) Since the assignment of coordinators can change as the "offset" topic partition leaders change (mainly due to broker down), they also have to take over existing consumer groups upon coordinator failovers.

The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance.

Consumer Fetching Data

As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.

To fetch data, consumers will issue fetch requests to the known leaders of the assigned partitions with a selector, and the poll function will return data from all the readable channels in the selector.

Wildcard Subscription Handling

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Partition Change Handling

In contrast to the subscription topic change, which is handled by the consumers via re-joining the group, the partition change events is handled by the coordinator.

Coordinator keeps a listener for each topic partition info path in ZK. Upon partition addition (and probably in the future, shrink), its listener will fire and the coordinator then check which groups are currently subscribing to this topic. For each one of these group, a rebalance will be triggered.

Note that the assigned consumer client id is assigned by the coordinator will also be used in heartbeat protocol and offset management talked below.

Coordinator Startup (Coordinator Failover)

Coordinators will be constructed upon broker startup, and initialized with empty consumer groups. There are two cases a new consumer group can be created on the coordinator:

1) When a first request is received from one of the consumers in this group, the coordinator need to create this group after validating by the group name that it is the assigned coordinator.

2) Since the assignment of coordinators can change as the "offset" topic partition leaders change (mainly due to broker down), they also have to take over existing consumer groups upon coordinator failovers.

The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance.

Consumer Fetching Data

As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.

To fetch data, consumers will issue fetch requests to the known leaders of the assigned partitions with a selector, and the poll function will return data from all the readable channels in the selector.

Wildcard Subscription Handling

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Partition Change Handling

In contrast to the subscription topic change, which is handled by the consumers via re-joining the group, the partition change events is handled by the coordinator.

Coordinator keeps a listener for each topic partition info path in ZK. Upon partition addition (and probably in the future, shrink), its listener will fire and the coordinator then check which groups are currently subscribing to this topic. For each one of these group, a rebalance will be triggered.

Failure Detection

When the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumer).

Heartbeat

1) Consumers periodically send heartbeat request to the coordinator based on every session_timeout / heartbeat_frequency (which is an integer > 1).

2) Upon receiving the heartbeat request, coordinator checks the generation number and the consumer id, if they are not validate (i.e., consumer id not assigned before, or generation number is smaller than current value), send an error response; otherwise send a normal response.

3) If coordinator has not heard from a consumer after session_timeout, treat the consumer as failed.

4) If consumer has not heart from the coordinator after session timeout, treat the coordinator as failed.

5) If consumer found the channel with the coordinator has closed, treat the coordinator as failed.

Note that on coordinator failover, the consumers may discover the new coordinator before or after the new coordinator has finsihed the failover process including loading the consumer group metadata from ZK, etc. In the latter case, the new coordinator will just accept its ping request as normal; in the former case, the new coordinator may reject its request, causing it to re-dicover coordinators and re-connect again, which is fine. Also, if the consumer connects to the new coordinator too later, it may already be expired by the new coordinator and will hence be treated as a new consumer, which is also fine.

 

-----------------------------------------------------------------------------------

Coordinator                                                                             <-- (ping) --     Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

-----------------------------------------------------------------------------------

Coordinator                                                                             <-- (ping) --     Consumer 1 (alive)

                                                                                               -- (response) -->

(Have not heard from consumer 2, it must have failed)                                 Consumer 2 (failed)

-----------------------------------------------------------------------------------

Coordinator (failed)                                                                 <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)

-----------------------------------------------------------------------------------

Coordinator (crashed)                                                            <-- X --    Consumer 1 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

                                                                                               <-- X --     Consumer 2 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

------------------------------------------------------------------------------------------------

 

PingRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
}

...