Goals
The main goal of this project is to have a thin consumer client design that can be easily adopted/reimplemented in non-Java languages. To achieve this we have decided to:
- Move the group member management and offset management from the client side to the server side with a centralized coordination mechanism. By doing so we can completely remove ZK-dependency from the consumer clients. General design proposal can be found here, here and here.
- Make the consumer client single-threaded with non-blocking API. Some discussions about this can be found here.
Besides this main goal we also want to add some new feature supports that we have seen while operating 0.8 and older versions. An in-comprehensive list below:
- Manual partition assignment: instead of going through the centralized coordination process to get assigned consuming partitions, the consumers should be able to simply specify their target partitions and start consume right away without any group membership management.
- Manual offset management: instead of using Kafka commit() call to manage the consumed offsets, consumers should be able to store/load their offsets outside Kafka clients and simply specify the starting offsets when consuming.
- Note that pre-0.9, we support both of these use cases by providing a simple consumer and a zookeeper consumer (i.e. high-level consumer) interface, and users need to choose all-or-none for the functionalities; in 0.9 we will combine these two into one interface with more flexibility.
Consumer API
Here is the proposed consumer API interface:
Consumer (Properties) { // Incrementally subscribe to a list of topics. // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions. void subscribe(String...) throws SubscriptionNotValidException; // Incrementally subscribe to a specified topic partition. // No group management will be invoked since assigned partitions have been specified. // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions. void subscribe(String, Int); // Incrementally subscribe to a specified topic partition starting at the given offset. // No group management will be invoked since assigned partitions have been specified. // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions. void subscribe(String, Int, Long); // Try to fetch messages from the topic/partitions it subscribed to. // Return whatever messages are available to be fetched or empty after specified timeout. // @thorws InitOffsetNotKnownException if the starting offset for the assigned partitions is unknown. List<Record> poll(Long) throws InitOffsetNotKnownException; // Commit latest offsets for all partitions currently consuming from. // If the sync flag is set to true, commit call will be sync and blocking. // Otherwise the commit call will be async and best-effort. void commit(Boolean); // Commit the specified offset for the specified partitions. // If the sync flag is set to true, commit call will be sync and blocking. // Otherwise the commit call will be async and best-effort. // @thorws InvalidPartitionsToCommitOffsetException if the group management has invoked, sync flag is true and the specified partitions do not belong to assigned ones. void commit(List[(String, Int, Long)], Boolean) throws InvalidPartitionsToCommitOffsetException; // Specify the fetch starting offset for the specified topic/partition. void position(List[(String, Int, Long)]) // Get the currently consuming partitions. // Block wait if the current partitions are not known yet. List[(String, Int)] getPartitions(); // Get the last committed offsets of the partitions currently consuming. // Block wait if the current partitions are not known yet. Map[(String, Int), Long] getOffsets() OffsetUnknownException; // Get the last committed offsets of the specified topic/partition. // @throws OffsetUnknownException if not offsets of the specified partitions have ever been committed. Long getOffset(String, Int) throws OffsetUnknownException; // --------- Call-back Below, not part of API ------------ // // Call-back function upon partition de-assignment. // Default implementation will commit offset depending on auto.commit config. void onPartitionDeassigned(List[(String, Int)]); // Call-back function upon partition re-assignment. // Default implementation will fetch starting offset depending on auto.commit config. void onPartitionAssigned(List[(String, Int)]) }
The Records returned by the poll() function will include metadata such as offset, partition, key and partition-key. This would also help removing the decompression/recompression in mirror maker (details can be found in KAFKA-1011).
Consumer Coordinator Overview
Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers. We follow the in-built offset management design described here to assign coordinators based on the "offset" topic partition leadership. That is, an "offset" topic will be used to store the offsets sent from each consumer group, and the partition id is based on the consumer group names. The leader of the partition that a consumer group's offsets commit to will be the coordinator of that group.
Since all the brokers cache the leadership for all the topic partitions, all brokers will know the coordinator for any given consumer groups. Upon receiving any requests from a consumer, the coordinator will only serve it if it is the assigned coordinator of the specified group this consumer belongs to; otherwise the request will be rejected.
For each consumer group, the coordinator stores the following information:
1) For each consumer group it currently serve, the group metadata containing:
- Its subscribed topic list.
- Group configs, including session timeout, etc.
- Consumer metadata for each consumer in the group.
- Current offsets for each consumed topic/partition.
- Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.
2) For each existing topic, a list of consumer groups that are currently subscribing to it.
In addition, for each consumer group the coordinator also maintains the group config and partition ownership metadata in ZK.
Consumer Startup (Subscription Change)
On consumer startup, it will usually first try to subscribe to a list of topics.
If it does not subscribe to any topics, then poll() will always return empty set; if it has subscribed to specific topic/partitions, then it does not need to communicate with the coordinator but just start fetching data (will talk about how to start fetching data below).
Only if it has changed its topic subscription (including from an empty set on startup), it will need to coordinate with the coordinators. If consumer has not connected to the coordinator, it will first try to find the current coordinator and setup the connection.
Another note is that in this proposal we do not require consumer's client id in the config but leave it as an optional config, when it is not specified a default id of will be used, and if group management is invoked the coordinator assign consumer ids upon them successfully joining the group.
Coordinator Discovery
The coordinator discovery is through the coordinator request, which can be answered by any alive brokers and whose response will contain the coordinator id based on consumer group.
-----------------------------------------------------------------------------------
Any Broker (alive) <-- (find_coordinator) -- Consumer 1
-- (response) -->
<-- (find_coordinator) -- Consumer 2
-- (response) -->
-----------------------------------------------------------------------------------
FindCoordinatorRequest
{ Version => int16 CorrelationId => int64 ClientId => String GroupId => String }
FindCoordinatorResponse
{ Version => int16 CorrelationId => int64 CoordinatorId => int16 }
Consumer (re)Join-Group
Upon successfully connects to the coordinator, the consumer will either try to (re-)join the group if 1) it has not own any partitions, or 2) its subscription has changed, or 3) its heartbeat responses has received an error. Otherwise it will just keep heartbeating with the coordinator (we will talk about this later). The consumer will try to join the group by doing the following:
1) Drop any fetched data, stop sending any fetch requests to brokers.
2) Release any assigned partition ownership by calling onPartitionDeassigned function.
3) Send a join group request to the coordinator and block waiting on the response.
Upon receiving a join group request, the coordinator will trigger a rebalance on the group this consumer belongs to.
When the rebalance finishes, it will send the response back to the consumer with the newly assigned partitions, the new consumer id along with the generation number back to the consumer.
------------------------------------------------------------------------------------
Coordinator <-- (join) -- Consumer 1 (alive)
(received a join request on consumer 1, rebalance)
...
(upon rebalance finish)
-- (response) -->
---------------------------------------------------------------------------------------
JoinGroupRequest
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerAddr => String SessionTimeout => int64 Subscribing => [String] }
JoinGroupResponse
{ Version => int16 CorrelationId => int64 ErrorCode => int16 Generation => int16 GroupId => String ConsumerId => String PartitionsToOwn => [<TopicAndPartition>] }
Consumer Fetching Data
As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.
To fetch data, consumers will issue fetch requests to the known leaders of the assigned partitions with a selector, and the poll function will return data from all the readable channels in the selector.
Wildcard Subscription Handling
With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.
Failure Detection
When the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumer).
Heartbeat Protocol
1) Consumers periodically send heartbeat request to the coordinator based on every session_timeout / heartbeat_frequency (which is an integer > 1).
2) Upon receiving the heartbeat request, coordinator checks the generation number and the consumer id, if they are not validate (i.e., consumer id not assigned before, or generation number is smaller than current value), send an error response; otherwise send a normal response.
3) If coordinator has not heard from a consumer after session_timeout, treat the consumer as failed.
4) If consumer has not heart from the coordinator after session timeout, treat the coordinator as failed.
5) If consumer found the channel with the coordinator has closed, treat the coordinator as failed.
-----------------------------------------------------------------------------------
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (response) -->
<-- (ping) -- Consumer 2 (alive)
-- (response) -->
-----------------------------------------------------------------------------------
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (response) -->
(Have not heard from consumer 2, it must have failed) Consumer 2 (failed)
-----------------------------------------------------------------------------------
Coordinator (failed) <-- (ping) -- Consumer 1 (alive)
(Have not heard back from coordinator, it must have failed)
<-- (ping) -- Consumer 2 (alive)
(Have not heard back from coordinator, it must have failed)
-----------------------------------------------------------------------------------
Coordinator (crashed) <-- X -- Consumer 1 (alive)
(Broken pipe from coordinator, it must have failed)
<-- X -- Consumer 2 (alive)
(Broken pipe from coordinator, it must have failed)
------------------------------------------------------------------------------------------------
PingRequest
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerId => String }
PingResponse
{ Version => int16 CorrelationId => int64 ErrorCode => int16 }
Consumer Failure Handling
When the coordinator thinks a consumer has failed, it will do the following:
1) Remove the consumer from the group metadata.
2) Trigger the rebalance process.
Coordinator Failure Handling
When the consumer thinks its current coordinator has failed, it will do the following:
1) Discover the new coordinator information from the existing known brokers through the find coordinator requests.
2) Set up the connection to the new coordinator.
3) Start sending heartbeats to the new coordinator.
------------------------------------------------------------------------------------
Coordinator (failed) <-- (ping) -- Consumer 1 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
<-- (ping) -- Consumer 2 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
Broker (alive) <-- (find_coordinator) -- Consumer 1
-- (response) -->
<-- (find_coordinator) -- Consumer 2
-- (response) -->
Coordinator (new) <-- (ping) -- Consumer 1 (alive)
-- (response) -->
<-- (ping) -- Consumer 2 (alive)
-- (response) -->
-------------------------------------------------------------------------------------------
Coordinator Startup (Coordinator Failover)
Coordinators will be constructed upon broker startup, and initialized with empty consumer groups. There are two cases a new consumer group can be created on the coordinator:
1) When a first request is received from one of the consumers in this group, the coordinator need to create this group after validating by the group name that it is the assigned coordinator.
2) Since the assignment of coordinators can change as the "offset" topic partition leaders change (mainly due to broker down), they also have to take over existing consumer groups upon coordinator failovers.
The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance.
Consumer Group Metadata in ZK
The consumer config and ownership metadata can be stored in one ZK path as a json object. This node will be updated upon rebalance completion and read upon coordinator failover.
Consumer Group Rebalance
The coordinator will trigger rebalance under the following conditions:
- It found a consumer has failed.
- It received a join request from a (possibly unknown) consumer.
- It found a partition change on the topics this group is subscribing from.
Rebalance Process: Option 1
On group rebalance, the coordinator will first sends a error code indicating a rebalance in its responses to all the consumers.
- Upon receiving the error code, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned(), and then sends a join request to the coordinator.
- Upon receiving a join request, coordinator checks if the specified group has already exist.
- If yes, hold the request and wait for the join requests from all the consumers within the group.
- If not, adds the group, and do not need to wait for any more join request and directly proceed to the next step.
- Coordinator calculates the partition assigned along with the new generation number and send it back in the join response.
- If some consumer's join request has not been received after session timeout, remove this consumer and re-trigger the rebalance by sends the join responses to rest consumers.
- Upon receiving the normal join response, consumers call onPartitionAssigned() and start fetching.
- If receiving error join response, consumer check if it is fatal; if so throw exceptions, otherwise re-send the join request.
Take the rebalance procedure upon new consumer startup for example:
---------------------------- 1 -------------------------------------------------------------------------------------
<-- (join) -- Consumer 3 (new)
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (rebalance) -->
<-- (ping) -- Consumer 2 (alive)
-- (rebalance) -->
--------------------------- 2/3 -------------------------------------------------------------------------------------
(wait for response) Consumer 3 (new)
Coordinator Consumer 1 (alive)
(stop fetching)
(call onPartitionDeassigned)
<-- (join) --
Consumer 2 (alive)
(stop fetching)
(call onPartitionDeassigned)
<-- (join) --
--------------------------- 3/5 -------------------------------------------------------------------------------------
Coordinator (calculate new assignment)
-- (response) --> Consumer 3 (new)
(call onPartitionAssigned)
(start fetching)
-- (response) --> Consumer 1 (alive)
(call onPartitionAssigned)
(start fetching)
-- (response) --> Consumer 2 (alive)
(call onPartitionAssigned)
(start fetching)
--------------------------- 4/6 -------------------------------------------------------------------------------------
Coordinator (have not heard from Consumer 2, retry rebalance)
-- (retry rebalance) --> Consumer 3 (new)
<-- (join) --
-- (retry rebalance) --> Consumer 1 (alive)
<-- (join) --
<-- X --> Consumer 2 (failed)
-----------------------------------------------------------------------------------------------------------------
Consumer's logic is summarized as:
-----------------------------------------------------------------------------------------------------------------
(Re)Start:
On 1) start-up, 2)subcription-change, 3) heart-beat fail, 4) coordinator fail, sends a join to the coordinator (if it is not known first find it through metadata).
Join group:
Upon receive join response, if it is normal record the sequence number and assignment and start consuming; otherwise retry or fail-fatally based on error code.
Heart beat:
Periodically send ping to coordinator, and if error received goes to the "start" step.
-----------------------------------------------------------------------------------------------------------------
The pros of this design would be avoiding unnecessary rebalances on coordinator failover, the cons is mainly at the fact that rebalance latency is lower-bounded by the ping frequency.
Another approach would be trigger the rebalance by closing the socket:
Rebalance Process: Option 2
On group rebalance, the coordinator will close sockets to all consumers (probably except the one which has sent him a join request).
- Upon getting a broken pipe from the current coordinator, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned().
- It then find the new coordinator through metadata request, and send a join request to the new coordinator.
- Upon receiving join request from all the consumers within the group, coordinator calculates the partition assigned along with the new generation number and send it back in the join response.
- Upon receiving a join request from an unknown group, immediately calculates the partition assigned (without waiting for any new comers).
Consumer's logic is similar to option 2 except that consumer do not need to handle heartbeat-failure case, since it is merged into coordinator-failure: the key here is to enforce a rebalance to consumers as coordinator failures.
This can lift the lower bound of the rebalance latency on ping frequency, but will require more one metadata request along with unnecessary rebalances on coordinator failover (cause they are now treated as one scenario).
Offset Management
Commit Offset
Upon receiving the commit offset request, the coordinator checks if the consumer is 1) within the group specified, 2) owns the partitions it commit offsets to, and 3) has the correct generation number. If yes append the offset entry to the corresponding log, otherwise rejects the request.
OffsetCommitRequest
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerId => String Generation => int16 Offsets => [TopicAndPartition -> OffsetAndError] }
OffsetCommitResponse
{ Version => int16 CorrelationId => int64 responses => [TopicAndPartition -> int16] // error code for each partition }
Fetch Offset
Fetch offset request is also handled by the coordinator.
Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.
Upon receiving a reject response from the coordinator, the consumer will try to connect to the new coordinator (i.e., set up the connection, send heartbeat and get valid response) and then retry.
OffsetFetchRequest
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerId => String Generation => int16 partitions => [TopicAndPartition] }
OffsetFetchResponse
{ Version => int16 CorrelationId => int64 offsets => [TopicAndPartition -> OffsetAndError] }
Consumer Architecture
API Implementation
subscribe (subscription): 1. Check if the new subscription is valid with the old subscription: 1.1. If yes change the subscription and return. 1.2. Otherwise throw SubscriptionNotValidException.
poll (timeout): 1. Check if the cluster metadata needs to be refreshed, if yes send metadata request. 2. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException. 3. Check if the subscription list is topic-based: 3.1. If not, call fetch(). // no need to talk to coordinator 3.2. If yes, first call coordinate() then call fetch()
commit(sync): 1. Call coordinate(). 2. Send commit offset to coordinator, based on sync block wait on response or not.
getPartition: 1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException 2. If the partition info is not known, call coordinate() 3. Return partitions info. getOffset(s): 1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException 2. If the partition info is not known, call coordinate() 3. If the offset info is not known, based on kafka.offset.manage send getOffset to coordinator or throw InvalidPartitionsToGetOffsetException
fetch: 1. Select-now on readable channels to get all available responses, for each response: 1.1. If it is fetch-response, store the fetched-data. 1.2. If it is metadata-response, process it and check if there is topic-change: 1.2.1. If yes, update the subscription list, and return immediately with stored data. 2. If there is no available data after step 1. 3. If there is no readable channels, check if the offset is known. 3.1. If not throw ConsumeInitOffsetNotKnownException. 3.2. Otherwise send the fetch request to leader and timeout-select.
coordinate: 1. Check if the coordinator channel has been setup and not closed, and if subscription list has not changed. If not: 1.1. Block wait on getting the coordinator metadata. 1.2. call onPartitionDeassignment(). 1.3. Send the registerConsumer to coordinator, set up the channel and block wait on registry response. 1.3.a.1 The registerConsumer response contain member info and partition info, call partitionsToConsume to get the owned partitions. 1.3.a.2 Send the partitions to consume info and get confirmed from the coordinator. // two round trips 1.3.b. The registerConsumer response contain partition assignment from the coordinator. // one round trip 1.4. and call onPartitionAssigned(). 2. If yes, check if heartbeat period has reached, if yes, send a ping to coordinator.
Open Questions
Apart with the above proposal, we still have some open questions that worth discuss:
- coordinate/deserialization/decompress procedures need to be interruptible on timeout?
- getOffsets and getPartitions be blocking?Should coordinator use its own socket server or use broker's socket server? Although the proposal suggests former we can think about this more.
- Should we allow consumers from the same group to have different session timeout? If yes when do we reject a consumer registration because of its proposed session timeout?
- Should we allow consumers from the same group have specific fixed subscribing topic partitions (G5)? If yes when do we reject a consumer registration with its subscribed topic partitions?
- Should we provide tools to delete topics? If yes, how will this affect coordinator's topic change logic?
- Should we do de-serialization/compression in fetcher thread or user thread? Although the proposal suggests de-serialization in user thread and de-compression in fetcher thread (or deserialization/compression all in client thread if we are following Option 1.a) we can still think about this more.
- Do we allow to call subscribe multiple times during its life time? This is related to G5.a
- Do we allow users to specify the offsets in commit() function call, and explicitly get last committed offset? This is related to G6.a
- Would we still keep a zookeeper string in consumer properties for consumer to read the bootstrap broker list as an alternative to the broker list property?
- Shall we try to avoid unnecessary rebalances on coordinator failover? In the current proposal upon coordinator failover the consumers will re-issue registration requests to the new coordinator, causing the new coordinator to trigger rebalance that is unnecessary.
- Shall we restrict commit offset request to only for the client's own assigned partitions? For some use cases such as clients managing their offsets themselves and only want to call commit offset once on startup, this might be useful.
- Shall we avoid request forwarding for option 2.b? One way to do that is let consumer remember two coordinators, and upon receiving stop fetching from group management coordinator, send a commit offset request to offset management coordinator and wait on response, and then respond to the stop fetching request.