You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Goals

The main goal of this project is to have a thin consumer client design that can be easily adopted/reimplemented in non-Java languages. To achieve this we have decided to:

  1. Move the group member management and offset management from the client side to the server side with a centralized coordination mechanism. By doing so we can completely remove ZK-dependency from the consumer clients. General design proposal can be found here, here and here.
  2. Make the consumer client single-threaded with non-blocking API. Some discussions about this can be found here.

 

Besides this main goal we also want to add some new feature supports that we have seen while operating 0.8 and older versions. An in-comprehensive list below:

  1. Manual partition assignment: instead of going through the centralized coordination process to get assigned consuming partitions, the consumers should be able to simply specify their target partitions and start consume right away without any group membership management.
  2. Manual offset management: instead of using Kafka commit() call to manage the consumed offsets, consumers should be able to store/load their offsets outside Kafka clients and simply specify the starting offsets when consuming.
  • Note that pre-0.9, we support both of these use cases by providing a simple consumer and a zookeeper consumer (i.e. high-level consumer) interface, and users need to choose all-or-none for the functionalities; in 0.9 we will combine these two into one interface with more flexibility.

 

Consumer API

Here is the proposed consumer API interface:

Consumer (Properties) {

  // Subscribe to a list of topics, return immediately. 
  // Throws an exception if the new subscription is not valid with existing subscriptions.
  void subscribe(String...) throws SubscriptionNotValidException;

  // Subscribe to a specified topic partition, return immediately.
  // Throws an exception if the new subscription is not valid with existing subscriptions.
  void subscribe(String, Int) throws SubscriptionNotValidException;

  // Subscribe to a specified topic partition at the given offset, return immediately.
  // Throws an exception if the new subscription is not valid with existing subscriptions.
  void subscribe(String, Int, Long) throws SubscriptionNotValidException;

  // Try to fetch messages from the topic/partitions it subscribed to. 
  // Return whatever messages are available to be fetched or empty after specified timeout.
  List<Record> poll(Long) throws SubscriptionIsEmptyException, ConsumeInitOffsetNotKnownException;

  // Commit latest offsets for all partitions currently consuming from.
  // If the sync flag is set to true, commit call will be sync and blocking.
  // Otherwise the commit call will be async and best-effort.
  void commit(Boolean) throws SubscriptionIsEmptyException; 

  // Commit the specified offset for the specified topic/partition.
  // If the sync flag is set to true, commit call will be sync and blocking.
  // Otherwise the commit call will be async and best-effort.
  // Throws an exception if the partitions specified are not currently consuming from.
  void commit(List[(String, Int, Long)], Boolean) throws InvalidPartitionsToCommitOffsetException;

  // Specify the fetch starting offset for the specified topic/partition.
  void pos(List[(String, Int, Long)])

  // Get the currently consuming partitions.
  // Block wait if the current partitions are not known yet.
  List[(String, Int)] getPartitions() throws SubscriptionIsEmptyException;

  // Get the last committed offsets of the partitions currently consuming.
  // Block wait if the current partitions are not known yet.
  Map[(String, Int), Long] getOffsets() throws SubscriptionIsEmptyException;

  // Get the last committed offsets of the specified topic/partition.
  Long getOffset(String, Int) throws SubscriptionIsEmptyException, InvalidPartitionsToGetOffsetException;
  
  // --------- Call-back Below, not part of API ------------ //

  // Call-back function upon partition de-assignment.
  // Default implementation will commit offset depending on auto.commit config.
  void onPartitionDeassigned(List[(String, Int)]);

  // Call-back function upon partition re-assignment.
  // Default implementation is No-Op.
  void onPartitionAssigned(List[(String, Int)])

  // --------- Optional Call-back Below ------------ //

  // Call-back function upon partition reassignment given the group member list and the subscribed topic partition info.
  // Return the partitions that would like to consume, and need to make sure each partition is covered by just one member in the group.
  List[(String, Int)] partitionsToConsume(List[String], List[TopicPartition]);
}

 

The Records returned by the poll() function will include metadata such as offset, partition, key and partition-key. This would also help removing the decompression/recompression in mirror maker (details can be found in KAFKA-1011).

 

Consumer Coordinator Overview

Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers.

For each consumer group, the coordinator stores the following information:

  1. Consumer groups it currently serve, containing
    1. The consumer list along with their subscribed topics.
    2. Partition ownership for each consumer within the group.
    3. Latest offsets for each consumed topic/partition.
  2. List of subscribed groups for each topic.

The coordinator also have the following functionalities:

  1. handleNewConsumerGroup: this is called when the coordinator starts to serve a new consumer group.
  2. handleAddedConsumer: this is called when a new consumer is registered into a existing consumer group.
  3. handleConsumerFailure: this is called when the coordinator thinks a consumer has failed and hence kick it out of the group.
  4. handleTopicChange: this is called when the coordinator detects a topic change from ZK.
  5. handleTopicPartitionChange: this is called when the coordinator detects a topic partition change from ZK.
  6. handleCommitOffset: this this called for committing offset for certain partitions.
  7. handleConsumerGroupRebalance: this is called when the partition ownership needs to be re-assigned within a group.

The coordinator also maintain the following information in ZK:

  1. For each group, the partition ownership of the subscribed topics (in a single ZK path).

The coordinator also holds the following modules:

  1. A threadpool of rebalancer threads executing rebalance process of single consumer groups.
  2. [Optional] A socket server for both receiving and send requests and sending responses.

Failure Detection

Both consumer failure detection (by coordinator) and coordinator failure detection (by consumer) is done through a heartbeat protocol:

  1. Consumers periodically send heartbeat to the coordinator, and upon receiving it coordinator should send back response.
  2. If coordinator has not heard from a consumer after session timeout, remove the consumer out of the group and trigger group rebalance.
  3. If coordinator received a new heartbeat request from a new consumer, trigger group rebalance
  4. If consumer has not heart back from the coordinator after session timeout, discover the new coordinator and continue sending heartbeat.
  5. If consumer's connection to the current coordinator has been closed, also discover the new coordinator and continue sending heartbeat.

 

----------------------------------------- 1 ------------------------------------------

Coordinator                                                                             <-- (ping) --     Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

----------------------------------------- 2 ------------------------------------------

Coordinator                                                                             <-- (ping) --     Consumer 1 (alive)

                                                                                               -- (response) -->

(Have not heard from consumer 2, rebalance)                                              Consumer 2 (failed)

----------------------------------------- 3 ------------------------------------------

Coordinator                                                                             <-- (ping) --     Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 3 (new)

(Did not know consumer 3 before, rebalance)

----------------------------------------- 4 ------------------------------------------

Coordinator (failed)                                                                 <-- (ping) --    Consumer 1 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               (Have not heard back from coordinator, try to reconnect to the new coordinator)

 

Coordinator (new)                                                                  <-- (ping) --    Consumer 1 (alive)

                                                                                               -- (response) -->

                                                                                               <-- (ping) --     Consumer 2 (alive)

                                                                                               -- (response) -->

----------------------------------------- 5 ------------------------------------------

Coordinator (crashed)                                                            <-- X --    Consumer 1 (alive)

                                                                                               (Socket closed exception, try to reconnect to the new coordinator)

                                                                                               <-- X --     Consumer 2 (alive)

------------------------------------------------------------------------------------------------

 

Subscription Change (Consumer Startup)

On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.

If consumer has not connected to the coordinator, it will first try to find the current coordinator and setup the connection.

  1. Consumer will send a join request to the coordinator with its current subscription info.
  2. Consumer will then block on waiting for the coordinator to send itself either the new assignment or an error response indicating rejecting the join request.
  3. Coordinator, upon receive a join request, will trigger a rebalance.

 

----------------------------------------- 1/2/3 ------------------------------------------

Coordinator                                                                            <-- (join) --    Consumer 1 (alive)

                                                                                               <-- (ping) --  Consumer 2 (alive)

(Consumer 1 has sent me a join request, rebalance)

------------------------------------------------------------------------------------------------

Consumer Group Rebalance

  1. On group rebalance, the coordinator will first sends a error code indicating a rebalance in its responses to all the consumers (may except the one who already sends a join request triggering this rebalance).
  2. Upon receiving the error code, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned(), and then sends a join request to the coordinator.
  3. Upon receiving join request from all the consumers within the group, coordinator calculates the partition assigned and send it back in the join response.
  4. If some consumer's join request has not been received after session timeout, remove this consumer and re-trigger the rebalance by sends the join responses to rest consumers.
  5. Upon receiving the normal join response, consumers call onPartitionAssigned() and start fetching.
  6. If receiving error join response, consumer check if it is fatal; if so throw exceptions, otherwise re-send the join request.

 

Take the rebalance procedure upon new consumer startup for example:

---------------------------- 1 -------------------------------------------------------------------------------------

                               <-- (join) -- Consumer 3 (new)

 

Coordinator            <-- (ping) -- Consumer 1 (alive)

                              -- (rebalance) -->

 

                               <-- (ping) -- Consumer 2 (alive)

                              -- (rebalance) -->

--------------------------- 2 -------------------------------------------------------------------------------------

                              (wait for response) Consumer 3 (new)

 

Coordinator                                         Consumer 1 (alive)

                                                                 (stop fetching)

                                                                 (call onPartitionDeassigned)

                                        <-- (join) --

 

                                                            Consumer 2 (alive)

                                                                 (stop fetching)

                                                                 (call onPartitionDeassigned)

                                        <-- (join) --

 --------------------------- 3/5 -------------------------------------------------------------------------------------

Coordinator   (calculate new assignment)

 

                                           -- (response) --> Consumer 3 (new)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

 

                                           -- (response) --> Consumer 1 (alive)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)


                                           -- (response) --> Consumer 2 (alive)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

 --------------------------- 4/6 -------------------------------------------------------------------------------------

Coordinator   (have not heard from Consumer 2, retry rebalance)

 

                                           -- (retry rebalance) --> Consumer 3 (new)

                                           <-- (join) --
 

                                           -- (retry rebalance) --> Consumer 1 (alive)

                                           <-- (join) --

 

                                           <-- X -->              Consumer 2 (failed)

-----------------------------------------------------------------------------------------------------------------

Offset Management

Upon receiving the commit offset request, the coordinator checks if the consumer is 1) within the group specified, 2) owns the partitions it commit offsets to. If yes append the offset entry to the corresponding log, otherwise rejects the request.

Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.

Upon receiving a reject response from the coordinator, the consumer will try to connect to the new consumer (i.e., set up the connection, send heartbeat and get valid response) and then retry.

 

The pros of this design would be having single-threaded consumer with simplified consumer-coordinator communication pattern (coordinator simply listens on each channel, receive requests and send responses).

The cons are mainly the fact that rebalance latency is determined by the longest heart beat timeout among all the consumers.

 

Consumer Architecture

API Implementation

 

subscribe (subscription):
 
   1. Check if the new subscription is valid with the old subscription:
 
   1.1. If yes change the subscription and return.
 
   1.2. Otherwise throw SubscriptionNotValidException.
poll (timeout):
 
   1. Check if the cluster metadata needs to be refreshed, if yes send metadata request.

   2. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException.

   3. Check if the subscription list is topic-based:
 
   3.1. If not, call fetch(). // no need to talk to coordinator

   3.2. If yes, first call coordinate() then call fetch()
commit(sync):
 
   1. Call coordinate().
 
   2. Send commit offset to coordinator, based on sync block wait on response or not.


getPartition:

   1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException

   2. If the partition info is not known, call coordinate()

   3. Return partitions info.


getOffset(s):

   1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException

   2. If the partition info is not known, call coordinate()

   3. If the offset info is not known, based on kafka.offset.manage send getOffset to coordinator or throw InvalidPartitionsToGetOffsetException


fetch:
 
   1. Select-now on readable channels to get all available responses, for each response:

   1.1. If it is fetch-response, store the fetched-data.

   1.2. If it is metadata-response, process it and check if there is topic-change:

   1.2.1. If yes, update the subscription list, and return immediately with stored data.

   2. If there is no available data after step 1.
 
   3. If there is no readable channels, check if the offset is known.

   3.1. If not throw ConsumeInitOffsetNotKnownException.

   3.2. Otherwise send the fetch request to leader and timeout-select.

 

coordinate:
 
   1. Check if the coordinator channel has been setup and not closed, and if subscription list has not changed. If not: 

   1.1. Block wait on getting the coordinator metadata.

   1.2. call onPartitionDeassignment().

   1.3. Send the registerConsumer to coordinator, set up the channel and block wait on registry response.
 
   1.3.a.1 The registerConsumer response contain member info and partition info, call partitionsToConsume to get the owned partitions.

   1.3.a.2 Send the partitions to consume info and get confirmed from the coordinator. // two round trips
 
   1.3.b. The registerConsumer response contain partition assignment from the coordinator. // one round trip
 
   1.4. and call onPartitionAssigned().

   2. If yes, check if heartbeat period has reached, if yes, send a ping to coordinator.

 

Open Questions

Apart with the above proposal, we still have some open questions that worth discuss:

  1. coordinate/deserialization/decompress procedures need to be interruptible on timeout?
  2. getOffsets and getPartitions be blocking?Should coordinator use its own socket server or use broker's socket server? Although the proposal suggests former we can think about this more.
  3. Should we allow consumers from the same group to have different session timeout? If yes when do we reject a consumer registration because of its proposed session timeout?
  4. Should we allow consumers from the same group have specific fixed subscribing topic partitions (G5)? If yes when do we reject a consumer registration with its subscribed topic partitions?
  5. Should we provide tools to delete topics? If yes, how will this affect coordinator's topic change logic?
  6. Should we do de-serialization/compression in fetcher thread or user thread? Although the proposal suggests de-serialization in user thread and de-compression in fetcher thread (or deserialization/compression all in client thread if we are following Option 1.a) we can still think about this more.
  7. Do we allow to call subscribe multiple times during its life time? This is related to G5.a
  8. Do we allow users to specify the offsets in commit() function call, and explicitly get last committed offset? This is related to G6.a
  9. Would we still keep a zookeeper string in consumer properties for consumer to read the bootstrap broker list as an alternative to the broker list property?
  10. Shall we try to avoid unnecessary rebalances on coordinator failover? In the current proposal upon coordinator failover the consumers will re-issue registration requests to the new coordinator, causing the new coordinator to trigger rebalance that is unnecessary.
  11. Shall we restrict commit offset request to only for the client's own assigned partitions? For some use cases such as clients managing their offsets themselves and only want to call commit offset once on startup, this might be useful.
  12. Shall we avoid request forwarding for option 2.b? One way to do that is let consumer remember two coordinators, and upon receiving stop fetching from group management coordinator, send a commit offset request to offset management coordinator and wait on response, and then respond to the stop fetching request.

 

  • No labels