Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
Consumer (Properties) {

  // SubscribeIncrementally subscribe to a list of topics, return immediately.
  // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions.
  void subscribe(String...) throws SubscriptionNotValidException;

  // Incrementally Subscribesubscribe to a specified topic partition, return immediately.
  // No group management will be invoked ifsince specificassigned partitions are specified.
  voidhave been specified.
  // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions.
  void subscribe(String, Int);

  // Incrementally Subscribesubscribe to a specified topic partition starting at the given offset, return immediately.
  // No group management will be envokedinvoked ifsince specificassigned partitions have arebeen specified.
   void subscribe(String, Int, Long)// @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions.
  void subscribe(String, Int, Long);

  // Try to fetch messages from the topic/partitions it subscribed to. 
  // Return whatever messages are available to be fetched or empty after specified timeout.
  // @thorws InitOffsetNotKnownException if the starting offset for the assigned partitions is unknown.
 List<Record> List<Record> poll(Long) throws SubscriptionIsEmptyException, ConsumeInitOffsetNotKnownExceptionInitOffsetNotKnownException;

  // Commit latest offsets for all partitions currently consuming from.
  // If the sync flag is set to true, commit call will be sync and blocking.
  // Otherwise the commit call will be async and best-effort.
  void void commit(Boolean) throws SubscriptionIsEmptyException; 

  // Commit the specified offset for the specified topic/partitionpartitions.
  // If the sync flag is set to true, commit call will be sync and blocking.
  // Otherwise the commit call will be async and best-effort.
  // Throws@thorws an exceptionInvalidPartitionsToCommitOffsetException if the group management has envokedinvoked, and partitionssync flag is true and the specified partitions aredo not belong currentlyto consumingassigned fromones.
  void commit(List[(String, Int, Long)], Boolean) throws InvalidPartitionsToCommitOffsetException;

  // Specify the fetch starting offset for the specified topic/partition.
  void position(List[(String, Int, Long)])

  // Get the currently consuming partitions.
  // Block wait if the current partitions are not known yet.
  List[(String, Int)] getPartitions() throws SubscriptionIsEmptyException;

  // Get the last committed offsets of the partitions currently consuming.
  // Block wait if the current partitions are not known yet.
  
  Map[(String, Int), Long] getOffsets() throws SubscriptionIsEmptyException, InvalidPartitionsToGetOffsetException, OffsetUnknownException;

  // Get the last committed offsets of the specified topic/partition.
  // Throws@throws anOffsetUnknownException exception if thenot groupoffsets managementof hasthe envoked andspecified partitions specifiedhave areever not currently consuming frombeen committed.
  Long getOffset(String, Int) throws SubscriptionIsEmptyException, InvalidPartitionsToGetOffsetExceptionOffsetUnknownException;
  
  // --------- Call-back Below, not part of API ------------ //

  // Call-back function upon partition de-assignment.
  // Default implementation will commit offset depending on auto.commit config.
  void onPartitionDeassigned(List[(String, Int)]);

  // Call-back function upon partition re-assignment.
  // Default implementation will fetch starting offset depending on auto.commit config.
  void onPartitionAssigned(List[(String, Int)])
}

...

Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers.

For each consumer group, the coordinator stores the following information:

...

  1. Its subscribed topic list.
  2. Group configs, including session timeout, etc.
  3. Consumer metadata for each consumer in the group.
  4. Current offsets for each consumed topic/partition.
  5. Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

...

The coordinator also have the following functionalities:

  1. handleNewConsumerGroup: this is called when the coordinator starts to serve a new consumer group.
  2. handleAddedConsumer: this is called when a new consumer is registered into a existing consumer group.
  3. handleConsumerFailure: this is called when the coordinator thinks a consumer has failed and hence kick it out of the group.
  4. handleTopicChange: this is called when the coordinator detects a topic change from ZK.
  5. handleTopicPartitionChange: this is called when the coordinator detects a topic partition change from ZK.
  6. handleCommitOffset: this this called for committing offset for certain partitions.
  7. handleConsumerGroupRebalance: this is called when the partition ownership needs to be re-assigned within a group.

The coordinator also maintain the following information in ZK:

  1. For each group, the partition ownership of the subscribed topics (in a single ZK path).

The coordinator also holds the following modules:

  1. A threadpool of rebalancer threads executing rebalance process of single consumer groups.
  2. [Optional] A socket server for both receiving and send requests and sending responses.

Failure Detection

Both consumer failure detection (by coordinator) and coordinator failure detection (by consumer) is done through a heartbeat protocol.

Heartbeat Protocol

...

We follow the in-built offset management design described here to assign coordinators based on the "offset" topic partition leadership. That is, an "offset" topic will be used to store the offsets sent from each consumer group, and the partition id is based on the consumer group names. The leader of the partition that a consumer group's offsets commit to will be the coordinator of that group.

Since all the brokers cache the leadership for all the topic partitions, all brokers will know the coordinator for any given consumer groups. Upon receiving any requests from a consumer, the coordinator will only serve it if it is the assigned coordinator of the specified group this consumer belongs to; otherwise the request will be rejected.

 

For each consumer group, the coordinator stores the following information:

1) For each consumer group it currently serve, the group metadata containing:

  • Its subscribed topic list.
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

 

In addition, for each consumer group the coordinator also maintains the group config and partition ownership metadata in ZK.

 

Consumer Startup (Subscription Change)

On consumer startup, it will usually first try to subscribe to a list of topics.

If it does not subscribe to any topics, then poll() will always return empty set; if it has subscribed to specific topic/partitions, then it does not need to communicate with the coordinator but just start fetching data (will talk about how to start fetching data below).

Only if it has changed its topic subscription (including from an empty set on startup), it will need to coordinate with the coordinators. If consumer has not connected to the coordinator, it will first try to find the current coordinator and setup the connection.

Another note is that in this proposal we do not require consumer's client id in the config but leave it as an optional config, when it is not specified a default id of will be used, and if group management is invoked the coordinator assign consumer ids upon them successfully joining the group.

Coordinator Discovery

The coordinator discovery is through the coordinator request, which can be answered by any alive brokers and whose response will contain the coordinator id based on consumer group

...

.

 

----------------------------------------- 1/2 ------------------------------------------

Coordinator                                                                             Any Broker (alive)                                                <-- (pingfind_coordinator) --     Consumer 1 (alive)

                                                                                                                                                                           -- (response) -->

                                                                                               <                                                                            <-- (pingfind_coordinator) --     Consumer 2 (alive)

                                                                                                                                                                           -- (response) -->

----------------------------------------- 3 ------------------------------------------

Coordinator                                                                             <-- (ping) --     Consumer 1 (alive)

                                                                                               -- (response) -->

(Have not heard from consumer 2, it must have failed)                                 Consumer 2 (failed)

----------------------------------------- 4 ------------------------------------------

Coordinator (failed)                                                                 <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)

FindCoordinatorRequest

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ClientId               => String
  GroupId                => String
}

 

FindCoordinatorResponse

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  CoordinatorId          => int16
}

Consumer (re)Join-Group

Upon successfully connects to the coordinator, the consumer will either try to (re-)join the group if 1) it has not own any partitions, or 2) its subscription has changed, or 3) its heartbeat responses has received an error. Otherwise it will just keep heartbeating with the coordinator (we will talk about this later). The consumer will try to join the group by doing the following:

1) Drop any fetched data, stop sending any fetch requests to brokers.

2) Release any assigned partition ownership by calling onPartitionDeassigned function.

3) Send a join group request to the coordinator and block waiting on the response.

 

Upon receiving a join group request, the coordinator will trigger a rebalance on the group this consumer belongs to.

When the rebalance finishes, it will send the response back to the consumer with the newly assigned partitions, the new consumer id along with the generation number back to the consumer.

---------------------------------------------- 5 ------------------------------------------

Coordinator (crashed)                                                                                                                                  <-- X (join) --        Consumer 1 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

                                                                                               <-- X --     Consumer 2 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

(received a join request on consumer 1, rebalance)

...

(upon rebalance finish)

                                                                                                -- (response) -->

------------------------------------------------------------------------------------------------

...

JoinGroupRequest

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  

...

ConsumerAddr           => String

...

 

PingResponse

...


  

...

SessionTimeout         

...

=> 

...

int64
  

...

Subscribing            => 

...

[String]
}

 

JoinGroupResponse

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16

...

  Generation             => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [<TopicAndPartition>]
}

 

Note that the assigned consumer client id is assigned by the coordinator will also be used in heartbeat protocol and offset management talked below.

Consumer Fetching Data

As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.

To fetch data, consumers will issue fetch requests to the known leaders of the assigned partitions with a selector, and the poll function will return data from all the readable channels in the selector.

Wildcard Subscription Handling

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Failure Detection

When the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumer).

Heartbeat Protocol

1) Consumers periodically send heartbeat request to the coordinator based on every session_timeout / heartbeat_frequency (which is an integer > 1).

2) Upon receiving the heartbeat request, coordinator checks the generation number and the consumer id, if they are not validate (i.e., consumer id not assigned before, or generation number is smaller than current value), send an error response; otherwise send a normal response.

3) If coordinator has not heard from a consumer after session_timeout, treat the consumer as failed.

4) If consumer has not heart from the coordinator after session timeout, treat the coordinator as failed.

5) If consumer found the channel with the coordinator has closed, treat the coordinator as failed.

 

 

Consumer Failure Handling

When the coordinator thinks a consumer has failed, it will do the following:

  1. Remove the consumer from the group metadata.
  2. Trigger the rebalance process.

 

Coordinator Failure Handling

When the consumer thinks its current coordinator has failed, it will do the following:

  1. Refresh its metadata from the existing known brokers to get the new coordinator information (if metadata request failed/timeout then retry until success).
  2. Set up the connection to the new coordinator.
  3. Start sending heartbeats to the coordinator.

 

----------------------------------------- 1/2/3 ------------------------------------------

Coordinator (failed)                                                                 <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

 

Broker (alive)                                                                         <-- (metadata) -- Consumer 1

                                                                                              -- (response) -->

                                                                                              <-- (metadata) -- Consumer 2

                                                                                              -- (response) -->

 

Coordinator (new)                                                                  <-- (ping) --    Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

--------------------------------------------------------------------------------------

Coordinator                                                                             <-- (ping) -- -    Consumer

...

1 (

...

alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

------

On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.

Therefore, these two process can be merged as one behavior that is triggered by subscription change (from possibly empty).

If consumer has not connected to the coordinator, it will first try to find the current coordinator from metada refresh and setup the connection.

Coordinator Discovery

The coordinator discovery is through the metadata request, which currently contains the 1) cluster information and the 2) leader information for specified topics. Will add a third piece of metadata in the response, which is the coordinator id based on consumer group.

----------------------------------------- 1 ------------------------------------------

Any Broker Coordinator                                                                             <-- (metadataping) --     Consumer 1 (alive)

                                                                                               -- (response: {cluster, leaders, coordinator}) -->

(Have not heard from consumer 2, it must have failed)                                 Consumer 2 (failed)

---------------------------------------------------------------------------------------

MetadataRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ClientId               => String
  Topics                 => [String]
  GroupIds               => [String]
}

This MetadataRequest inherits from the TopicMetadataRequest in 0.8.

MetadataResponse

-

Coordinator (failed)                                                                 <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, it must have failed)

-----------------------------------------------------------------------------------

Coordinator (crashed)                                                            <-- X --    Consumer 1 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

                                                                                               <-- X --     Consumer 2 (alive)

                                                                                               (Broken pipe from coordinator, it must have failed)

------------------------------------------------------------------------------------------------

PingRequest

Code Block
{
  Version
Code Block
{
  Version                => int16
  CorrelationId          => int64
  Brokers                      => int16
  CorrelationId          => [Broker]
  TopicsMetadataint64
  GroupId                => [TopicMetadata]
String
  ConsumerId    CoordinatorIds         => [int16]String
}

 

Consumer (re)Join-Group

If the consumer 1) did not know which partitions to consumer (since it has just started, for example), 2) realized a topic change either from the metadata response or from subscription change. It will send a join request to the coordinator.

PingResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
}
 

Consumer Failure Handling

When the coordinator thinks a consumer has failed, it will do the following:

1) Remove the consumer from the group metadata.

2) Trigger the rebalance process.

Coordinator Failure Handling

When the consumer thinks its current coordinator has failed, it will do the following:

1) Discover the new coordinator information from the existing known brokers through the find coordinator requests.

2) Set up the connection to the new coordinator.

3) Start sending heartbeats to the new coordinator.

 Upon received a join request, the coordinator will trigger a rebalance.

----------------------------------------- 2 -------------------------------------------

Coordinator (failed)                                                                                                                                  <-- (joinping) --        Consumer 1 (alive)

                                                                                              

(received a join request on consumer 1, rebalance)

(Have not heard back from coordinator, try to reconnect to the new coordinator)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

 

Broker (alive)                                                                         <-- (find_coordinator) -- Consumer 1

                                                                                              -- (response) -->

                                                                                              <-- (find_coordinator) -- Consumer 2

                                                                                              -- (response) -->

 

Coordinator (new)                                                                  <-- (ping) --    Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

---------------------------------------------------------------------------------------

JoinGroupRequest

 

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerAddr           => String
  SessionTimeout         => int64
  Subscribing            => [String]
}

 

JoinGroupResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  Generation             => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [<TopicAndPartition>]
}

 

...

--------------------------------

Coordinator Startup (Coordinator Failover)

Coordinators will be constructed upon broker startup, and initialized with empty consumer groups. There are two cases a new consumer group can be created on the coordinator:

1) When a first request is received from one of the consumers in this group, the coordinator need to create this group after validating by the group name that it is the assigned coordinator.

2) Since the assignment of coordinators can change as the "offset" topic partition leaders change (mainly due to broker down), they also have to take over existing consumer groups upon coordinator failovers.

The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance.

 

Consumer Group Metadata in ZK

The consumer config and ownership metadata can be stored in one ZK path as a json object. This node will be updated upon rebalance completion and read upon coordinator failover.

Consumer Group Rebalance

The coordinator will trigger rebalance under the following conditions:

...