The main goal of this project is to have a thin consumer client design that can be easily adopted/reimplemented in non-Java languages. To achieve this we have decided to:
Besides this main goal we also want to add some new feature supports that we have seen while operating 0.8 and older versions. An in-comprehensive list below:
Here is the proposed consumer API interface:
Consumer (Properties) { // Subscribe to a list of topics, return immediately. // Throws an exception if the new subscription is not valid with existing subscriptions. void subscribe(String...) throws SubscriptionNotValidException; // Subscribe to a specified topic partition, return immediately. // Throws an exception if the new subscription is not valid with existing subscriptions. void subscribe(String, Int) throws SubscriptionNotValidException; // Subscribe to a specified topic partition at the given offset, return immediately. // Throws an exception if the new subscription is not valid with existing subscriptions. void subscribe(String, Int, Long) throws SubscriptionNotValidException; // Try to fetch messages from the topic/partitions it subscribed to. // Return whatever messages are available to be fetched or empty after specified timeout. List<Record> poll(Long) throws SubscriptionIsEmptyException, ConsumeInitOffsetNotKnownException; // Commit latest offsets for all partitions currently consuming from. // If the sync flag is set to true, commit call will be sync and blocking. // Otherwise the commit call will be async and best-effort. void commit(Boolean) throws SubscriptionIsEmptyException; // Commit the specified offset for the specified topic/partition. // If the sync flag is set to true, commit call will be sync and blocking. // Otherwise the commit call will be async and best-effort. // Throws an exception if the partitions specified are not currently consuming from. void commit(List[(String, Int, Long)], Boolean) throws InvalidPartitionsToCommitOffsetException; // Specify the fetch starting offset for the specified topic/partition. void pos(List[(String, Int, Long)]) // Get the currently consuming partitions. // Block wait if the current partitions are not known yet. List[(String, Int)] getPartitions() throws SubscriptionIsEmptyException; // Get the last committed offsets of the partitions currently consuming. // Block wait if the current partitions are not known yet. Map[(String, Int), Long] getOffsets() throws SubscriptionIsEmptyException; // Get the last committed offsets of the specified topic/partition. Long getOffset(String, Int) throws SubscriptionIsEmptyException, InvalidPartitionsToGetOffsetException; // --------- Call-back Below, not part of API ------------ // // Call-back function upon partition de-assignment. // Default implementation will commit offset depending on auto.commit config. void onPartitionDeassigned(List[(String, Int)]); // Call-back function upon partition re-assignment. // Default implementation is No-Op. void onPartitionAssigned(List[(String, Int)]) // --------- Optional Call-back Below ------------ // // Call-back function upon partition reassignment given the group member list and the subscribed topic partition info. // Return the partitions that would like to consume, and need to make sure each partition is covered by just one member in the group. List[(String, Int)] partitionsToConsume(List[String], List[TopicPartition]); } |
The Records returned by the poll() function will include metadata such as offset, partition, key and partition-key. This would also help removing the decompression/recompression in mirror maker (details can be found in KAFKA-1011).
Each consumer group will be assigned a consumer coordinator responsible for its membership management and offset management. Each broker will host a consumer coordinator so that the management of consumer groups will be distributed across all brokers.
For each consumer group, the coordinator stores the following information:
The coordinator also have the following functionalities:
The coordinator also maintain the following information in ZK:
The coordinator also holds the following modules:
Both consumer failure detection (by coordinator) and coordinator failure detection (by consumer) is done through a heartbeat protocol.
----------------------------------------- 1/2 ------------------------------------------
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (response) -->
<-- (ping) -- Consumer 2 (alive)
-- (response) -->
----------------------------------------- 3 ------------------------------------------
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (response) -->
(Have not heard from consumer 2, it must have failed) Consumer 2 (failed)
----------------------------------------- 4 ------------------------------------------
Coordinator (failed) <-- (ping) -- Consumer 1 (alive)
(Have not heard back from coordinator, it must have failed)
<-- (ping) -- Consumer 2 (alive)
(Have not heard back from coordinator, it must have failed)
----------------------------------------- 5 ------------------------------------------
Coordinator (crashed) <-- X -- Consumer 1 (alive)
(Broken pipe from coordinator, it must have failed)
<-- X -- Consumer 2 (alive)
(Broken pipe from coordinator, it must have failed)
------------------------------------------------------------------------------------------------
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerId => String } |
{ Version => int16 CorrelationId => int64 ErrorCode => int16 } |
When the coordinator thinks a consumer has failed, it will do the following:
When the consumer thinks its current coordinator has failed, it will do the following:
----------------------------------------- 1/2/3 ------------------------------------------
Coordinator (failed) <-- (ping) -- Consumer 1 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
<-- (ping) -- Consumer 2 (alive)
(Have not heard back from coordinator, try to reconnect to the new coordinator)
Broker (alive) <-- (metadata) -- Consumer 1
-- (response) -->
<-- (metadata) -- Consumer 2
-- (response) -->
Coordinator (new) <-- (ping) -- Consumer 1 (alive)
-- (response) -->
<-- (ping) -- Consumer 2 (alive)
-- (response) -->
-------------------------------------------------------------------------------------------
On consumer startup, it will first try to subscribe to a list of topics; and similarly when the consumer changes its subscription list, it can be treated as a consumer restart.
Therefore, these two process can be merged as one behavior that is triggered by subscription change (from possibly empty).
If consumer has not connected to the coordinator, it will first try to find the current coordinator from metada refresh and setup the connection.
The coordinator discovery is through the metadata request, which currently contains the 1) cluster information and the 2) leader information for specified topics. Will add a third piece of metadata in the response, which is the coordinator id based on consumer group.
----------------------------------------- 1 ------------------------------------------
Any Broker <-- (metadata) -- Consumer 1 (alive)
-- (response: {cluster, leaders, coordinator}) -->
---------------------------------------------------------------------------------------
{ Version => int16 CorrelationId => int64 ClientId => String Topics => [String] GroupIds => [String] } |
This MetadataRequest inherits from the TopicMetadataRequest in 0.8.
{ Version => int16 CorrelationId => int64 Brokers => [Broker] TopicsMetadata => [TopicMetadata] CoordinatorIds => [int16] } |
If the consumer 1) did not know which partitions to consumer (since it has just started, for example), 2) realized a topic change either from the metadata response or from subscription change. It will send a join request to the coordinator.
Upon received a join request, the coordinator will trigger a rebalance.
----------------------------------------- 2 ------------------------------------------
Coordinator <-- (join) -- Consumer 1 (alive)
(received a join request on consumer 1, rebalance)
---------------------------------------------------------------------------------------
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerAddr => String SessionTimeout => int64 Subscribing => [String] } |
{ Version => int16 CorrelationId => int64 ErrorCode => int16 Generation => int16 GroupId => String ConsumerId => String PartitionsToOwn => [<TopicAndPartition>] } |
Note that the consumer id is assigned by the coordinator upon joining the group, which is then used in heartbeat protocol and offset management.
The coordinator will trigger rebalance under the following conditions:
On group rebalance, the coordinator will first sends a error code indicating a rebalance in its responses to all the consumers.
Take the rebalance procedure upon new consumer startup for example:
---------------------------- 1 -------------------------------------------------------------------------------------
<-- (join) -- Consumer 3 (new)
Coordinator <-- (ping) -- Consumer 1 (alive)
-- (rebalance) -->
<-- (ping) -- Consumer 2 (alive)
-- (rebalance) -->
--------------------------- 2/3 -------------------------------------------------------------------------------------
(wait for response) Consumer 3 (new)
Coordinator Consumer 1 (alive)
(stop fetching)
(call onPartitionDeassigned)
<-- (join) --
Consumer 2 (alive)
(stop fetching)
(call onPartitionDeassigned)
<-- (join) --
--------------------------- 3/5 -------------------------------------------------------------------------------------
Coordinator (calculate new assignment)
-- (response) --> Consumer 3 (new)
(call onPartitionAssigned)
(start fetching)
-- (response) --> Consumer 1 (alive)
(call onPartitionAssigned)
(start fetching)
-- (response) --> Consumer 2 (alive)
(call onPartitionAssigned)
(start fetching)
--------------------------- 4/6 -------------------------------------------------------------------------------------
Coordinator (have not heard from Consumer 2, retry rebalance)
-- (retry rebalance) --> Consumer 3 (new)
<-- (join) --
-- (retry rebalance) --> Consumer 1 (alive)
<-- (join) --
<-- X --> Consumer 2 (failed)
-----------------------------------------------------------------------------------------------------------------
Consumer's logic is summarized as:
-----------------------------------------------------------------------------------------------------------------
(Re)Start:
On 1) start-up, 2)subcription-change, 3) heart-beat fail, 4) coordinator fail, sends a join to the coordinator (if it is not known first find it through metadata).
Join group:
Upon receive join response, if it is normal record the sequence number and assignment and start consuming; otherwise retry or fail-fatally based on error code.
Heart beat:
Periodically send ping to coordinator, and if error received goes to the "start" step.
-----------------------------------------------------------------------------------------------------------------
The pros of this design would be avoiding unnecessary rebalances on coordinator failover, the cons is mainly at the fact that rebalance latency is lower-bounded by the ping frequency.
Another approach would be trigger the rebalance by closing the socket:
On group rebalance, the coordinator will close sockets to all consumers (probably except the one which has sent him a join request).
Consumer's logic is similar to option 2 except that consumer do not need to handle heartbeat-failure case, since it is merged into coordinator-failure: the key here is to enforce a rebalance to consumers as coordinator failures.
This can lift the lower bound of the rebalance latency on ping frequency, but will require more one metadata request along with unnecessary rebalances on coordinator failover (cause they are now treated as one scenario).
Upon receiving the commit offset request, the coordinator checks if the consumer is 1) within the group specified, 2) owns the partitions it commit offsets to, and 3) has the correct generation number. If yes append the offset entry to the corresponding log, otherwise rejects the request.
Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.
Upon receiving a reject response from the coordinator, the consumer will try to connect to the new coordinator (i.e., set up the connection, send heartbeat and get valid response) and then retry.
subscribe (subscription): 1. Check if the new subscription is valid with the old subscription: 1.1. If yes change the subscription and return. 1.2. Otherwise throw SubscriptionNotValidException. |
poll (timeout): 1. Check if the cluster metadata needs to be refreshed, if yes send metadata request. 2. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException. 3. Check if the subscription list is topic-based: 3.1. If not, call fetch(). // no need to talk to coordinator 3.2. If yes, first call coordinate() then call fetch() |
commit(sync): 1. Call coordinate(). 2. Send commit offset to coordinator, based on sync block wait on response or not. |
getPartition: 1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException 2. If the partition info is not known, call coordinate() 3. Return partitions info. getOffset(s): 1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException 2. If the partition info is not known, call coordinate() 3. If the offset info is not known, based on kafka.offset.manage send getOffset to coordinator or throw InvalidPartitionsToGetOffsetException |
fetch: 1. Select-now on readable channels to get all available responses, for each response: 1.1. If it is fetch-response, store the fetched-data. 1.2. If it is metadata-response, process it and check if there is topic-change: 1.2.1. If yes, update the subscription list, and return immediately with stored data. 2. If there is no available data after step 1. 3. If there is no readable channels, check if the offset is known. 3.1. If not throw ConsumeInitOffsetNotKnownException. 3.2. Otherwise send the fetch request to leader and timeout-select. |
coordinate: 1. Check if the coordinator channel has been setup and not closed, and if subscription list has not changed. If not: 1.1. Block wait on getting the coordinator metadata. 1.2. call onPartitionDeassignment(). 1.3. Send the registerConsumer to coordinator, set up the channel and block wait on registry response. 1.3.a.1 The registerConsumer response contain member info and partition info, call partitionsToConsume to get the owned partitions. 1.3.a.2 Send the partitions to consume info and get confirmed from the coordinator. // two round trips 1.3.b. The registerConsumer response contain partition assignment from the coordinator. // one round trip 1.4. and call onPartitionAssigned(). 2. If yes, check if heartbeat period has reached, if yes, send a ping to coordinator. |
Apart with the above proposal, we still have some open questions that worth discuss: