Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

Table of Contents

Goals

The main goal of this project is to have a thin consumer client design that can be easily adopted/reimplemented in non-Java languages. To achieve this we have decided to:

...

The only difference is that for 1) the coordinator will immediately trigger a rebalance for this newly created group, whereas for 2) the coordinator needs to load its current config and ownership information from ZK but do not necessarily trigger the rebalance. 

Consumer Fetching Data

As in the new producer, consumers will periodically send topic metadata request to the available brokers to find the leader of its assigned partitions. Upon 1) receiving an error while fetching data and 2) receiving a join response from the coordinator, it will also force a topic metadata refresh.

...

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Failure Detection

Partition Change Handling

In contrast to the subscription topic change, which is handled by the consumers via re-joining the group, the partition change events is handled by the coordinator.

Coordinator keeps a listener for each topic partition info path in ZK. Upon partition addition (and probably in the future, shrink), its listener will fire and the coordinator then check which groups are currently subscribing to this topic. For each one of these group, a rebalance will be triggered.

Failure Detection

When the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumerWhen the consumer has succesfully joined the group, it will keep sending the heartbeat request to the coordinator, this protocol is used both for consumer failure detection (by coordinator) and coordinator failure detection (by consumer).

Heartbeat Protocol

1) Consumers periodically send heartbeat request to the coordinator based on every session_timeout / heartbeat_frequency (which is an integer > 1).

...

 

Consumer Group Rebalance

The coordinator will trigger rebalance under the following conditions:

  1. It found a consumer has failed.
  2. It received a join request from a (possibly unknown) consumer.
  3. It found a partition change on the topics this group is subscribing from.

Rebalance Process: Option 1

On group rebalance, the coordinator will first sends a error code indicating a rebalance in its responses to all the consumers.

  1. Upon receiving the error code, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned(), and then sends a join request to the coordinator.
  2. Upon receiving a join request, coordinator checks if the specified group has already exist.
    1. If yes, hold the request and wait for the join requests from all the consumers within the group.
    2. If not, adds the group, and do not need to wait for any more join request and directly proceed to the next step.
  3. Coordinator calculates the partition assigned along with the new generation number and send it back in the join response.
  4. If some consumer's join request has not been received after session timeout, remove this consumer and re-trigger the rebalance by sends the join responses to rest consumers.
  5. Upon receiving the normal join response, consumers call onPartitionAssigned() and start fetching.
  6. If receiving error join response, consumer check if it is fatal; if so throw exceptions, otherwise re-send the join request.

 

Take the rebalance procedure upon new consumer startup for example:

As we have seen, a consumer group rebalance can be initiated by the coordinator when

1) Partition information of the current subscribed topics changed.

2) Memebership info changed due to consumer failure detected.

3) Membership info changed due to some consumer (re-)joins the group (which could be due to topic subscription change).

 

The coordinator triggers the rebalance by telling all consumers left in the group to stop fetching, release previously assigned partition ownership, and send itself a join request indicating that they have finished these preparation steps. Hence there is a synchronization barrier where all the consumers' join requests have been received by the coordinator. The only difference is that for 3), some consumer(s) has already stopped fetching, released previous assigned partition ownership, and sent a join request to the coordinator, so that coordinator does not need to tell him do so any more.

Rebalance Process: Option 1

One way that coordinator tells the consumers to prepare rebalance is to piggy back an error code in the ping response, however by doing so the rebalance latency is lower bounded by the heartbeat frequency of the consumers. Therefore we enforce consumers to do so ASAP by letting the coordinator to cut the sockets of the consumers that have not finished preparing the rebalance, letting them to go through the coordinator failure handling process as discovering the new coordinator, and send the ping request immediately, then respond with the error message:

1) Upon receiving the error code, the consumer should stop fetching from its current partition leaders, call onPartitionDeassigned(), and then sends a join request to the coordinator.

2) Coordinator wait for the join requests from all the consumers within the group.

3) Upon received all the join requests, coordinator calculates the partition assigned, increment the generation number and send it back in the join response to all consumers.

4) If some consumer's join request has not been received after session timeout, remove this consumer and re-trigger the rebalance by sends the join responses with a retry-error-code to rest consumers.

5) Upon receiving the normal join response, consumers call onPartitionAssigned() and start fetching.

6) If receiving error join response, consumer check if it is fatal; if so execute the coordinator failure process, otherwise re-send the join request.

 

Take the rebalance procedure upon new consumer startup for example:

---------------------------------------- 1 -------------------------------------------------------------------------------------

                               <-- (join) -- Consumer 3 (new)

 

Coordinator            < -- (pingbroken pipe) --> Consumer 1 (alive)


                                                             -- (rebalancebroker pipe) -->

 

                               <-- (ping) -- Consumer 2 (alive)                              -- (rebalance) -

->--------------------------- 2/3 -------------------------------------------------------------------------------------

                              (wait for response) Consumer 3 (new)

 

Coordinator                                         Coordinator            <-- (ping) --           Consumer 1 (alive)

                                                                 (stop fetching)

                                                                 (call onPartitionDeassigned)

                                        <-- (join) --

 

                              -- (rebalance) -->

 

                               <-- (ping) --                                                                       Consumer 2 (alive)

                                                                 (stop fetching)

                                                                 (call onPartitionDeassigned)

                                        <                              -- (joinrebalance) -->

 --------------------------- 3/5 -------------------------------------------------------------------------------------

Coordinator   (calculate new assignment                              (wait for response) Consumer 3 (new)

 

Coordinator                                         Consumer 1 (alive)

                                                                 (stop fetching)

                                                                                   (call onPartitionDeassigned)

                                        <-- (responsejoin) --> Consumer 3 (new)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

 

                                                            Consumer 2 (alive)

                                                                 (stop fetching) 

                                           -- (response) --> Consumer 1 (alive)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

                                                 (call onPartitionDeassigned)

                                        <-- (responsejoin) --> Consumer 2 (alive)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

 - --------------------------- 4/6 -------------------------------------------------------------------------------------

Coordinator   (have not heard from Consumer 2, retry rebalancecalculate new assignment)

 

                                           -- (retry rebalanceresponse) --> Consumer 3 (new)

                                                                             (call onPartitionAssigned)

                                           <-- (join) --                                                                             (start fetching)

 

                                           -- (retry rebalanceresponse) --> Consumer 1 (alive)

                                           <-- (join) --

                                                                             (call onPartitionAssigned)

                                                                             (start fetching) 


                                           < -- X (response) -->              > Consumer 2 (failed(alive)

                                                                             (call onPartitionAssigned)

                                                                             (start fetching)

 -----------------------------------------------------------------------------------------------------------------

 

Consumer's logic is summarized as:

Coordinator   (have not heard from Consumer 2, retry rebalance)

 

                                           -- (retry rebalance) --> Consumer 3 (new)

                                           <-- (join) --
 

                                           -- (retry rebalance) --> Consumer 1 (alive)

                                           <-- (join) --

 

                                           <-- X -->              Consumer 2 (failed)

-----------------------------------------------------------------------------------------------------------------

(Re)Start:

  On 1) start-up, 2)subcription-change, 3) heart-beat fail, 4) coordinator fail, sends a join to the coordinator (if it is not known first find it through metadata).

Join group:

  Upon receive join response, if it is normal record the sequence number and assignment and start consuming; otherwise retry or fail-fatally based on error code.

Heart beat:

  Periodically send ping to coordinator, and if error received goes to the "start" step.

-------------------

 

Consumer's logic can be summarized as the following state-machine:

-------------------------------------------------------------------------------------------------------------------------------------

 

The pros of this design would be avoiding unnecessary rebalances on coordinator failover, the cons is mainly at the fact that rebalance latency is lower-bounded by the ping frequency.

Another approach would be trigger the rebalance by closing the socket:

Rebalance Process: Option 2

On group rebalance, the coordinator will close sockets to all consumers (probably except the one which has sent him a join request).

  1. Upon getting a broken pipe from the current coordinator, consumer should stop fetching from its current partition leaders, call onPartitionDeassigned().
  2. It then find the new coordinator through metadata request, and send a join request to the new coordinator.
  3. Upon receiving join request from all the consumers within the group, coordinator calculates the partition assigned along with the new generation number and send it back in the join response.
  4. Upon receiving a join request from an unknown group, immediately calculates the partition assigned (without waiting for any new comers).

 

Consumer's logic is similar to option 2 except that consumer do not need to handle heartbeat-failure case, since it is merged into coordinator-failure: the key here is to enforce a rebalance to consumers as coordinator failures.

This can lift the lower bound of the rebalance latency on ping frequency, but will require more one metadata request along with unnecessary rebalances on coordinator failover (cause they are now treated as one scenario).

 

Offset Management

Commit Offset

Upon receiving the commit offset request, the coordinator checks if the consumer is 1) within the group specified, 2) owns the partitions it commit offsets to, and 3) has the correct generation number. If yes append the offset entry to the corresponding log, otherwise rejects the request.

OffsetCommitRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String 
  ConsumerId             => String
  Generation             => int16
  Offsets                => [TopicAndPartition -> OffsetAndError]
}

OffsetCommitResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  responses              => [TopicAndPartition -> int16]  // error code for each partition
}

 

Fetch Offset

Fetch offset request is also handled by the coordinator.

Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.

Upon receiving a reject response from the coordinator, the consumer will try to connect to the new coordinator (i.e., set up the connection, send heartbeat and get valid response) and then retry.

 

OffsetFetchRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String 
  ConsumerId             => String
  Generation             => int16
  partitions             => [TopicAndPartition]
}

 

OffsetFetchResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  offsets                => [TopicAndPartition -> OffsetAndError] 
}

 

Consumer Architecture

API Implementation

 

Code Block
subscribe (subscription):
 
   1. Check if the new subscription is valid with the old subscription:
 
   1.1. If yes change the subscription and return.
 
   1.2. Otherwise throw SubscriptionNotValidException.
Code Block
poll (timeout):
 
   1. Check if the cluster metadata needs to be refreshed, if yes send metadata request.

   2. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException.

   3. Check if the subscription list is topic-based:
 
   3.1. If not, call fetch(). // no need to talk to coordinator

   3.2. If yes, first call coordinate() then call fetch()
Code Block
commit(sync):
 
   1. Call coordinate().
 
   2. Send commit offset to coordinator, based on sync block wait on response or not.
Code Block
getPartition:

   1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException

   2. If the partition info is not known, call coordinate()

   3. Return partitions info.


getOffset(s):

   1. Check if the subscription list is empty, if yes throw SubscriptionIsEmptyException

   2. If the partition info is not known, call coordinate()

   3. If the offset info is not known, based on kafka.offset.manage send getOffset to coordinator or throw InvalidPartitionsToGetOffsetException
Code Block
fetch:
 
   1. Select-now on readable channels to get all available responses, for each response:

   1.1. If it is fetch-response, store the fetched-data.

   1.2. If it is metadata-response, process it and check if there is topic-change:

   1.2.1. If yes, update the subscription list, and return immediately with stored data.

   2. If there is no available data after step 1.
 
   3. If there is no readable channels, check if the offset is known.

   3.1. If not throw ConsumeInitOffsetNotKnownException.

   3.2. Otherwise send the fetch request to leader and timeout-select.

 

...

Joined (join response received, started fetching and pinging):

  1) Topic Subscription Change  => Joining (prepare for rebalance, send join request)

  2) Rebalance Error Ping Response => Joining (prepare for rebalance, send join request)

  3) Coordinator Failed => Discovering (send find_coordinator)

  4) Shutdown Signal => Stopped

Discovering (lost connection with coordinator, but may be fetching data)

  1) Found Coordinator => Joined (directly start pinging the new coordinator)

  2) Shutdown Signal => Stopped

Joining (not fetching, waiting for join response)

  1) Fatal Error Join Response => Discovering

  2) Retry Error Join Response => Joining (re-send join)

  3) Normal Join Response => Joined

  4) Shutdown Signal => Stopped

Stopped (not started, killed, shutdown)

  1) Startup => Discovering

-----------------------------------------------------------------------------------------------------------------

 

 

One can realize that the above approaches tries to avoid enforcing unnecessary rebalances upon coordinator failovers (since otherwise we can just let consumers to always try rejoin the group upon coordinator failure). Another approach doing this optimization would be to let the consumers to decide whether to re-join the group or not upon coordinator failover based on the generation number.

Rebalance Process: Option 2

For this approach, if a coordinator tries to rebalance, it needs to firstly increment the generation number, update the ZK accordingly, and then close the socket.

Also the consumer group generation number metadata needs to be able to propagated to all brokers from ZK, either through controller or through coordinator. We do not consider ZK directly push this information to brokers upon updates or a pull-based method since neither are scalable.

Then the rebalance process becomes:

1) Upon coordinator failure, fetch the new coordinator id along with the generation number.

2) If the generation number is not bigger than its currently held generation number, send ping request to the coordinator upon connection setup.

3) Otherwise, prepare the rebalance by stop fetching and call onPartitionDeassigned, and then send a join request to the coordinator upon connection setup.

4) Coordinator accepts the offset commit even if the generation number is smaller if it is undergoing rebalance for the group (will talk about offset management in the next section).

 

Consumer's logic can then be summarized as the following state-machine:

-----------------------------------------------------------------------------------------------------------------

Joined (join response received, started fetching and pinging):

  1) Topic Subscription Change  => Joining (prepare for rebalance, send join request)

  2) Coordinator Failed => Discovering (send find_coordinator)

  3) Shutdown Signal => Stopped

Discovering (lost connection with coordinator, but may be fetching data)

  1) Found Coordinator, Generation Number Changed => Joining

  2) Found Coordinator, Generation Number OK => Joined

  3) Shutdown Signal => Stopped

Joining (not fetching, waiting for join response)

  1) Error Join Response => Discovering

  2) Normal Join Response => Joined

  3) Shutdown Signal => Stopped

Stopped (not started, killed, shutdown)

  1) Startup => Discovering

-----------------------------------------------------------------------------------------------------------------

 

Offset Management

Commit Offset

Upon receiving the commit offset request, the coordinator checks if the consumer is

1) Within the group specified,

2) Owns the partitions it commit offsets to, and

3) Has the correct generation number.

If yes append the offset entry to the corresponding log, otherwise rejects the request.

 

OffsetCommitRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String 
  ConsumerId             => String
  Generation             => int16
  Offsets                => [TopicAndPartition -> OffsetAndError]
}


OffsetCommitResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  responses              => [TopicAndPartition -> int16]  // error code for each partition
}

 

Fetch Offset

Fetch offset request is also handled by the coordinator.

Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.

Upon receiving a reject response from the coordinator, the consumer will try to connect to the new coordinator (i.e., transit from Joined/Joining to Discovering in state machine).

 

OffsetFetchRequest

Code Block
{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String 
  ConsumerId             => String
  Generation             => int16
  partitions             => [TopicAndPartition]
}

 

OffsetFetchResponse

Code Block
{
  Version                => int16
  CorrelationId          => int64
  offsets                => [TopicAndPartition -> OffsetAndError] 
}

 

 

Open Questions

Apart with the above proposal, we still have some open questions that worth discuss:

  1. coordinate/deserialization/decompress procedures need to be interruptible on timeout? 
  2. getOffsets and getPartitions be blocking?Should coordinator use its own socket server or use broker's socket server? Although the proposal suggests former we can think about this more.
  3. Should we allow consumers from the same group to have different session timeout? If yes when do we reject a consumer registration because of its proposed session timeout?
  4. Should we allow consumers from the same group have specific fixed subscribing topic partitions (G5)? If yes when do we reject a consumer registration with its subscribed topic partitions?
  5. Should we provide tools to delete topics? If yes, how will this affect coordinator's topic change logic?
  6. Should we do de-serialization/compression in fetcher thread or user thread? Although the proposal suggests de-serialization in user thread and de-compression in fetcher thread (or deserialization/compression all in client thread if we are following Option 1.a) we can still think about this more.
  7. Do we allow to call subscribe multiple times during its life time? This is related to G5.a
  8. Do we allow users to specify the offsets in commit() function call, and explicitly get last committed offset? This is related to G6.a
  9. Would we still keep a zookeeper string in consumer properties for consumer to read the bootstrap broker list as an alternative to the broker list property?
  10. Shall we try to avoid unnecessary rebalances on coordinator failover? In the current proposal upon coordinator failover the consumers will re-issue registration requests to the new coordinator, causing the new coordinator to trigger rebalance that is unnecessary.
  11. Shall we restrict commit offset request to only for the client's own assigned partitions? For some use cases such as clients managing their offsets themselves and only want to call commit offset once on startup, this might be useful.
  12. Shall we avoid request forwarding for option 2.b? One way to do that is let consumer remember two coordinators, and upon receiving stop fetching from group management coordinator, send a commit offset request to offset management coordinator and wait on response, and then respond to the stop fetching request.

...