Goals
The main goal of this project is to have a thin consumer client design that can be easily adopted/reimplemented in non-Java languages. To achieve this we have decided to:
- Move the group member management and offset management from the client side to the server side with a centralized coordination mechanism. By doing so we can completely remove ZK-dependency from the consumer clients. General design proposal can be found here, here and here.
- Make the consumer client single-threaded with non-blocking API. Some discussions about this can be found here.
Besides this main goal we also want to add some new feature supports that we have seen while operating 0.8 and older versions. An in-comprehensive list below:
- Manual partition assignment: instead of going through the centralized coordination process to get assigned consuming partitions, the consumers should be able to simply specify their target partitions and start consume right away without any group membership management.
- Manual offset management: instead of using Kafka commit() call to manage the consumed offsets, consumers should be able to store/load their offsets outside Kafka clients and simply specify the starting offsets when consuming.
- Note that pre-0.9, we support both of these use cases by providing a simple consumer and a zookeeper consumer (i.e. high-level consumer) interface, and users need to choose all-or-none for the functionalities; in 0.9 we will combine these two into one interface with more flexibility.
Consumer API
Here is the proposed consumer API interface:
Consumer (Properties) { // Subscribe to a list of topics, return immediately. // Throws an exception if the new subscription is not valid with existing subscriptions. void subscribe(String...) throws SubscriptionNotValidException; // Subscribe to a specified topic partition, return immediately. // Throws an exception if the new subscription is not valid with existing subscriptions. void subscribe(String, Int) throws SubscriptionNotValidException; // Subscribe to a specified topic partition at the given offset, return immediately. // Throws an exception if the new subscription is not valid with existing subscriptions. void subscribe(String, Int, Long) throws SubscriptionNotValidException; // Try to fetch messages from the topic/partitions it subscribed to. // Return whatever messages are available to be fetched or empty after specified timeout. List<Record> poll(Long) throws SubscriptionIsEmptyException, ConsumeInitOffsetNotKnownException; // Commit latest offsets for all partitions currently consuming from. // If the sync flag is set to true, commit call will be sync and blocking. // Otherwise the commit call will be async and best-effort. void commit(Boolean) throws SubscriptionIsEmptyException; // Commit the specified offset for the specified topic/partition. // If the sync flag is set to true, commit call will be sync and blocking. // Otherwise the commit call will be async and best-effort. // Throws an exception if the partitions specified are not currently consuming from. void commit(List[(String, Int, Long)], Boolean) throws InvalidPartitionsToCommitOffsetException; // Specify the fetch starting offset for the specified topic/partition. void pos(List[(String, Int, Long)]) // Get the currently consuming partitions. // Block wait if the current partitions are not known yet. List[(String, Int)] getPartitions() throws SubscriptionIsEmptyException; // Get the last committed offsets of the partitions currently consuming. // Block wait if the current partitions are not known yet. Map[(String, Int), Long] getOffsets() throws SubscriptionIsEmptyException; // Get the last committed offsets of the specified topic/partition. Long getOffset(String, Int) throws SubscriptionIsEmptyException, InvalidPartitionsToGetOffsetException; // --------- Call-back Below, not part of API ------------ // // Call-back function upon partition de-assignment. // Default implementation will commit offset depending on auto.commit config. void onPartitionDeassigned(List[(String, Int)]); // Call-back function upon partition re-assignment. // Default implementation is No-Op. void onPartitionAssigned(List[(String, Int)]) // --------- Optional Call-back Below ------------ // // Call-back function upon partition reassignment given the group member list and the subscribed topic partition info. // Return the partitions that would like to consume, and need to make sure each partition is covered by just one member in the group. List[(String, Int)] partitionsToConsume(List[String], List[TopicPartition]); }
The Records returned by the poll() function will include metadata such as offset, partition, key and partition-key. This would also help removing the decompression/recompression in mirror maker (details can be found in KAFKA-1011).
Consumer Coordinator Overview
Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers.
For each consumer group, the coordinator stores the following information:
- Group metadata of each consumer group it currently serve, containing
- Its subscribed topic list.
- Group configs, including session timeout, etc.
- Consumer metadata for each consumer in the group.
- Current offsets for each consumed topic/partition.
- Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.
- Group lists for each existing topic that currently subscribing to it.
The coordinator also have the following functionalities:
- handleNewConsumerGroup: this is called when the coordinator starts to serve a new consumer group.
- handleAddedConsumer: this is called when a new consumer is registered into a existing consumer group.
- handleConsumerFailure: this is called when the coordinator thinks a consumer has failed and hence kick it out of the group.
- handleTopicChange: this is called when the coordinator detects a topic change from ZK.
- handleTopicPartitionChange: this is called when the coordinator detects a topic partition change from ZK.
- handleCommitOffset: this this called for committing offset for certain partitions.
- handleConsumerGroupRebalance: this is called when the partition ownership needs to be re-assigned within a group.
The coordinator also maintain the following information in ZK:
- For each group, the partition ownership of the subscribed topics (in a single ZK path).
The coordinator also holds the following modules:
- A threadpool of rebalancer threads executing rebalance process of single consumer groups.
- [Optional] A socket server for both receiving and send requests and sending responses.
Failure Detection
Both consumer failure detection (by coordinator) and coordinator failure detection (by consumer) is done through a heartbeat protocol.
Heartbeat Protocol
- Consumers periodically send heartbeat along with the generation number to the coordinator based on session_timeout * 1/3.
- And upon receiving the heartbeat request, coordinator send back response immediately if the generation is good.
- If coordinator has not heard from a consumer after session_timeout, treat the consumer as failed.
- If consumer has not heart back from the coordinator session timeout * 2/3 after it sends heartbeat request (possibly multiple times), treat the coordinator as failed.
- If consumer found the channel with the coordinator has closed, treat the coordinator as failed.
- If a ping request with an older or newer generation number is received, reject the ping request to let the consumer to send the join request.
----------------------------------------- 1/2 ------------------------------------------
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (response) -->
<-- (ping) -- Consumer 2 (alive)
-- (response) -->
----------------------------------------- 3 ------------------------------------------
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (response) -->
(Have not heard from consumer 2, it must have failed) Consumer 2 (failed)
----------------------------------------- 4 ------------------------------------------
Coordinator (failed) <-- (ping) -- Consumer 1 (alive)
(Have not heard back from coordinator, it must have failed)
<-- (ping) -- Consumer 2 (alive)
(Have not heard back from coordinator, it must have failed)
----------------------------------------- 5 ------------------------------------------
Coordinator (crashed) <-- X -- Consumer 1 (alive)
(Broken pipe from coordinator, it must have failed)
<-- X -- Consumer 2 (alive)
(Broken pipe from coordinator, it must have failed)
------------------------------------------------------------------------------------------------
PingRequest
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerId => String }
PingResponse
{ Version => int16 CorrelationId => int64 ErrorCode => int16 }
Consumer Failure Handling
When the coordinator thinks a consumer has failed, it will do the following:
- Remove the consumer from the group metadata.
- Trigger the rebalance process.
Coordinator Failure Handling
When the consumer thinks its current coordinator has failed, it will do the following:
- Refresh its metadata from the existing known brokers to get the new coordinator information (if metadata request failed/timeout then retry until success).
- Set up the connection to the new coordinator.
- Start sending heartbeats to the coordinator.
----------------------------------------- 1/2/3 ------------------------------------------
Coordinator (failed) <-- (ping) -- Consumer 1 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
<-- (ping) -- Consumer 2 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
Broker (alive) <-- (metadata) -- Consumer 1
-- (response) -->
<-- (metadata) -- Consumer 2
-- (response) -->
Coordinator (new) <-- (ping) -- Consumer 1 (alive)
-- (response) -->
<-- (ping) -- Consumer 2 (alive)
-- (response) -->
-------------------------------------------------------------------------------------------
Consumer Startup (Subscription Change)
On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.
Therefore, these two process can be merged as one behavior that is triggered by subscription change (from possibly empty).
If consumer has not connected to the coordinator, it will first try to find the current coordinator from metada refresh and setup the connection.
Coordinator Discovery
The coordinator discovery is through the metadata request, which currently contains the 1) cluster information and the 2) leader information for specified topics. Will add a third piece of metadata in the response, which is the coordinator id based on consumer group.
----------------------------------------- 1 ------------------------------------------
Any Broker <-- (metadata) -- Consumer 1 (alive)
-- (response: {cluster, leaders, coordinator}) -->
---------------------------------------------------------------------------------------
MetadataRequest
{ Version => int16 CorrelationId => int64 ClientId => String Topics => [String] GroupIds => [String] }
This MetadataRequest inherits from the TopicMetadataRequest in 0.8.
MetadataResponse
{ Version => int16 CorrelationId => int64 Brokers => [Broker] TopicsMetadata => [TopicMetadata] CoordinatorIds => [int16] }
Consumer (re)Join-Group
If the consumer 1) did not know which partitions to consumer (since it has just started, for example), 2) realized a topic change either from the metadata response or from subscription change. It will send a join request to the coordinator.
Upon received a join request, the coordinator will trigger a rebalance.
----------------------------------------- 2 ------------------------------------------
Coordinator <-- (join) -- Consumer 1 (alive)
(received a join request on consumer 1, rebalance)
---------------------------------------------------------------------------------------
JoinGroupRequest
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerAddr => String SessionTimeout => int64 Subscribing => [String] }
JoinGroupResponse
{ Version => int16 CorrelationId => int64 ErrorCode => int16 Generation => int16 GroupId => String ConsumerId => String PartitionsToOwn => [<TopicAndPartition>] }
Consumer Group Rebalance
The coordinator will trigger rebalance under the following conditions:
- It found a consumer has failed.
- It received a join request from a (possibly unknown) consumer.
- It found a partition change on the topics this group is subscribing from.
Rebalance Process: Option 1
On group rebalance, the coordinator will first sends a error code indicating a rebalance in its responses to all the consumers.
- Upon receiving the error code, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned(), and then sends a join request to the coordinator.
- Upon receiving a join request, coordinator checks if the specified group has already exist.
- If yes, hold the request and wait for the join requests from all the consumers within the group.
- If not, adds the group, and do not need to wait for any more join request and directly proceed to the next step.
- Coordinator calculates the partition assigned along with the new generation number and send it back in the join response.
- If some consumer's join request has not been received after session timeout, remove this consumer and re-trigger the rebalance by sends the join responses to rest consumers.
- Upon receiving the normal join response, consumers call onPartitionAssigned() and start fetching.
- If receiving error join response, consumer check if it is fatal; if so throw exceptions, otherwise re-send the join request.
Take the rebalance procedure upon new consumer startup for example:
---------------------------- 1 -------------------------------------------------------------------------------------
<-- (join) -- Consumer 3 (new)
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (rebalance) -->
<-- (ping) -- Consumer 2 (alive)
-- (rebalance) -->
--------------------------- 2/3 -------------------------------------------------------------------------------------
(wait for response) Consumer 3 (new)
Coordinator Consumer 1 (alive)
(stop fetching)
(call onPartitionDeassigned)
<-- (join) --
Consumer 2 (alive)
(stop fetching)
(call onPartitionDeassigned)
<-- (join) --
--------------------------- 3/5 -------------------------------------------------------------------------------------
Coordinator (calculate new assignment)
-- (response) --> Consumer 3 (new)
(call onPartitionAssigned)
(start fetching)
-- (response) --> Consumer 1 (alive)
(call onPartitionAssigned)
(start fetching)
-- (response) --> Consumer 2 (alive)
(call onPartitionAssigned)
(start fetching)
--------------------------- 4/6 -------------------------------------------------------------------------------------
Coordinator (have not heard from Consumer 2, retry rebalance)
-- (retry rebalance) --> Consumer 3 (new)
<-- (join) --
-- (retry rebalance) --> Consumer 1 (alive)
<-- (join) --
<-- X --> Consumer 2 (failed)
-----------------------------------------------------------------------------------------------------------------
Consumer's logic is summarized as:
-----------------------------------------------------------------------------------------------------------------
(Re)Start:
On 1) start-up, 2)subcription-change, 3) heart-beat fail, 4) coordinator fail, sends a join to the coordinator (if it is not known first find it through metadata).
Join group:
Upon receive join response, if it is normal record the sequence number and assignment and start consuming; otherwise retry or fail-fatally based on error code.
Heart beat:
Periodically send ping to coordinator, and if error received goes to the "start" step.
-----------------------------------------------------------------------------------------------------------------
The pros of this design would be avoiding unnecessary rebalances on coordinator failover, the cons is mainly at the fact that rebalance latency is lower-bounded by the ping frequency.
Another approach would be trigger the rebalance by closing the socket:
Rebalance Process: Option 2
On group rebalance, the coordinator will close sockets to all consumers (probably except the one which has sent him a join request).
- Upon getting a broken pipe from the current coordinator, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned().
- It then find the new coordinator through metadata request, and send a join request to the new coordinator.
- Upon receiving join request from all the consumers within the group, coordinator calculates the partition assigned along with the new generation number and send it back in the join response.
- Upon receiving a join request from an unknown group, immediately calculates the partition assigned (without waiting for any new comers).
Consumer's logic is similar to option 2 except that consumer do not need to handle heartbeat-failure case, since it is merged into coordinator-failure: the key here is to enforce a rebalance to consumers as coordinator failures.
This can lift the lower bound of the rebalance latency on ping frequency, but will require more one metadata request along with unnecessary rebalances on coordinator failover (cause they are now treated as one scenario).
Offset Management
Upon receiving the commit offset request, the coordinator checks if the consumer is 1) within the group specified, 2) owns the partitions it commit offsets to, and 3) has the correct generation number. If yes append the offset entry to the corresponding log, otherwise rejects the request.
Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.
Upon receiving a reject response from the coordinator, the consumer will try to connect to the new coordinator (i.e., set up the connection, send heartbeat and get valid response) and then retry.
Consumer Architecture
API Implementation
subscribe (subscription): 1. Check if the new subscription is valid with the old subscription: 1.1. If yes change the subscription and return. 1.2. Otherwise throw SubscriptionNotValidException.
poll (timeout): 1. Check if the cluster metadata needs to be refreshed, if yes send metadata request. 2. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException. 3. Check if the subscription list is topic-based: 3.1. If not, call fetch(). // no need to talk to coordinator 3.2. If yes, first call coordinate() then call fetch()
commit(sync): 1. Call coordinate(). 2. Send commit offset to coordinator, based on sync block wait on response or not.
getPartition: 1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException 2. If the partition info is not known, call coordinate() 3. Return partitions info. getOffset(s): 1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException 2. If the partition info is not known, call coordinate() 3. If the offset info is not known, based on kafka.offset.manage send getOffset to coordinator or throw InvalidPartitionsToGetOffsetException
fetch: 1. Select-now on readable channels to get all available responses, for each response: 1.1. If it is fetch-response, store the fetched-data. 1.2. If it is metadata-response, process it and check if there is topic-change: 1.2.1. If yes, update the subscription list, and return immediately with stored data. 2. If there is no available data after step 1. 3. If there is no readable channels, check if the offset is known. 3.1. If not throw ConsumeInitOffsetNotKnownException. 3.2. Otherwise send the fetch request to leader and timeout-select.
coordinate: 1. Check if the coordinator channel has been setup and not closed, and if subscription list has not changed. If not: 1.1. Block wait on getting the coordinator metadata. 1.2. call onPartitionDeassignment(). 1.3. Send the registerConsumer to coordinator, set up the channel and block wait on registry response. 1.3.a.1 The registerConsumer response contain member info and partition info, call partitionsToConsume to get the owned partitions. 1.3.a.2 Send the partitions to consume info and get confirmed from the coordinator. // two round trips 1.3.b. The registerConsumer response contain partition assignment from the coordinator. // one round trip 1.4. and call onPartitionAssigned(). 2. If yes, check if heartbeat period has reached, if yes, send a ping to coordinator.
Open Questions
Apart with the above proposal, we still have some open questions that worth discuss:
- coordinate/deserialization/decompress procedures need to be interruptible on timeout?
- getOffsets and getPartitions be blocking?Should coordinator use its own socket server or use broker's socket server? Although the proposal suggests former we can think about this more.
- Should we allow consumers from the same group to have different session timeout? If yes when do we reject a consumer registration because of its proposed session timeout?
- Should we allow consumers from the same group have specific fixed subscribing topic partitions (G5)? If yes when do we reject a consumer registration with its subscribed topic partitions?
- Should we provide tools to delete topics? If yes, how will this affect coordinator's topic change logic?
- Should we do de-serialization/compression in fetcher thread or user thread? Although the proposal suggests de-serialization in user thread and de-compression in fetcher thread (or deserialization/compression all in client thread if we are following Option 1.a) we can still think about this more.
- Do we allow to call subscribe multiple times during its life time? This is related to G5.a
- Do we allow users to specify the offsets in commit() function call, and explicitly get last committed offset? This is related to G6.a
- Would we still keep a zookeeper string in consumer properties for consumer to read the bootstrap broker list as an alternative to the broker list property?
- Shall we try to avoid unnecessary rebalances on coordinator failover? In the current proposal upon coordinator failover the consumers will re-issue registration requests to the new coordinator, causing the new coordinator to trigger rebalance that is unnecessary.
- Shall we restrict commit offset request to only for the client's own assigned partitions? For some use cases such as clients managing their offsets themselves and only want to call commit offset once on startup, this might be useful.
- Shall we avoid request forwarding for option 2.b? One way to do that is let consumer remember two coordinators, and upon receiving stop fetching from group management coordinator, send a commit offset request to offset management coordinator and wait on response, and then respond to the stop fetching request.