You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 36 Next »

Goals

The main goal of this project is to have a thin consumer client design that can be easily adopted/reimplemented in non-Java languages. To achieve this we have decided to:

  1. Move the group member management and offset management from the client side to the server side with a centralized coordination mechanism. By doing so we can completely remove ZK-dependency from the consumer clients. General design proposal can be found here, here and here.
  2. Make the consumer client single-threaded with non-blocking API. Some discussions about this can be found here.

 

Besides this main goal we also want to add some new feature supports that we have seen while operating 0.8 and older versions. An in-comprehensive list below:

  1. Manual partition assignment: instead of going through the centralized coordination process to get assigned consuming partitions, the consumers should be able to simply specify their target partitions and start consume right away without any group membership management.
  2. Manual offset management: instead of using Kafka commit() call to manage the consumed offsets, consumers should be able to store/load their offsets outside Kafka clients and simply specify the starting offsets when consuming.
  • Note that pre-0.9, we support both of these use cases by providing a simple consumer and a zookeeper consumer (i.e. high-level consumer) interface, and users need to choose all-or-none for the functionalities; in 0.9 we will combine these two into one interface with more flexibility.

Motivation

The motivation of moving to a new set of consumer client APIs with broker side co-ordination is laid out here. Some back of the envelope calculations along with experience of building a prototype for this suggest that with a single coordinator we should support up to 5K consumers (the bottleneck should be the number of socket channels a single machine can have). Every broker will be the co-ordinator for a subset of consumer groups, thereby, providing the ability to scale the number of consumer groups linearly with the size of the cluster.

Consumer API

The proposed consumer APIs are here. Several API usage examples are documented here.

Group management protocol

Rebalancing is the process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group has subscribed to. At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance. The way rebalancing works is as follows. One of the brokers is elected as the coordinator for a subset of the consumer groups. It will be responsible for triggering rebalancing attempts for certain consumer groups on consumer group membership changes or subscribed topic partition changes. It will also be responsible for communicating the resulting partition-consumer ownership configuration to all consumers of the group undergoing a rebalance operation.

Consumer

  1. On startup or on co-ordinator failover, the consumer sends a ClusterMetadataRequest to any of the brokers in the "bootstrap.brokers" list. In the ClusterMetadataResponse, it receives the location of the co-ordinator for it's group.  
  2. The consumer sends a RegisterConsumer request to it's co-ordinator broker. In the RegisterConsumerResponse, it receives the list of topic partitions that it should own.
  3. At this time, group management is done and the consumer starts fetching data and (optionally) committing offsets for the list of partitions it owns.

Co-ordinator

  1. Upon election or startup, the co-ordinator reads the list of groups it manages and their membership information from zookeeper. If there is no previous group membership information, it does nothing until the first consumer in some group registers with it.
  2. Upon election, if the co-ordinator finds previous group membership information in zookeeper, it waits for the consumers in each of the groups to re-register with it. Once all known and alive consumers register with the co-ordinator, it marks the rebalance completed. 
  3. Upon election or startup, the co-ordinator also starts failure detection for all consumers in a group. Consumers that are marked as dead by the co-ordinator's failure detection protocol are removed from the group and the co-ordinator marks the rebalance for a group completed by communicating the new partition ownership to the remaining consumers in the group.
  4. In steady state, the co-ordinator tracks the health of each consumer in every group through it's failure detection protocol.
  5. If the co-ordinator marks a consumer as dead, it triggers a rebalance operation for the remaining consumers in the group. Rebalance is triggered by killing the socket connection to the remaining consumers in the group. Consumers notice the broken socket connection and trigger a co-ordinator discovery process (#1, #2 above). Once all alive consumers re-register with the co-ordinator, it communicates the new partition ownership to each of the consumers in the RegisterConsumerResponse, thereby completing the rebalance operation.
  6. The co-ordinator tracks the changes to topic partition changes for all topics that any consumer group has registered interest for. If it detects a new partition for any topic, it triggers a rebalance operation (as described in #5 above). It is currently not possible to reduce the number of partitions for a topic. The creation of new topics can also trigger a rebalance operation as consumers can register for topics before they are created.

Failure detection protocol

The consumer specifies a session timeout, a heartbeat frequency and failed heartbeat threshold in the RegisterConsumerRequest that it sends to the co-ordinator in order to join a consumer group. The consumer initiates periodic heartbeats (HeartbeatRequest) to the co-ordinator and waits for a response. The heartbeat frequency determines the number of times a consumer sends a heartbeat to the co-ordinator within a session timeout window. The failure threshold determines the number of failed or missed heartbeats allowed to the consumer before it marks the consumer dead. For example, for a session timeout of 1 second and a heartbeat frequency of 3, the consumer is expected to send a HeartbeatRequest to the co-ordinator every (1000/3=333.33 milliseconds). With a failed heartbeat threshold of 3, it means the co-ordinator will mark the consumer dead, if it hasn't received at least one HeartbeatRequest every (1000/3)*3 i.e 1 second.

When the consumer has successfully joins a group, the failure detection process starts on the consumer as well as the co-ordinator. Here is the protocol in more detail -

1) Consumer periodically sends a HeartbeatRequest to the coordinator every session_timeout / heartbeat_frequency milliseconds.

2) Upon receiving the HeartbeatRequest, coordinator checks the generation number, the consumer id and the consumer group. If the consumer specifies an invalid or stale generation id (i.e., consumer id not assigned before, or generation number is smaller than current value), it send an error code in the HeartbeatResponse.

3) If coordinator has not heard from a consumer at least once in failed_heartbeat_threshold*(session_timeout/heartbeat_frequency), mark the consumer as dead, break the socket connection to the consumer and trigger a rebalance process for the group.

4) If consumer has not heart from the coordinator after session timeout, it treats the coordinator as failed and triggers the co-ordinator re discovery process.

5) If consumer finds the socket channel to the coordinator to be closed, it treats the coordinator as failed and triggers the co-ordinator re discovery process..

Note that on coordinator failover, the consumers may discover the new coordinator before or after the new coordinator has finished the failover process including loading the consumer group metadata from ZK, etc. In the latter case, the new coordinator will just accept its ping request as normal; in the former case, the new coordinator may reject its request, causing it to re-dicover coordinators and re-connect again, which is fine. Also, if the consumer connects to the new coordinator too late, the co-ordinator may have marked the consumer has dead and will be treat the consumer as a new consumer, which is also fine.

Request formats

For each consumer group, the coordinator stores the following information:

1) For each consumer group, the group metadata containing:

  • Its subscribed topic list.
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

ConsumerMetadataRequest

{
  Version                => int16
  CorrelationId          => int64
  ClientId               => String
  GroupId                => String
}

 

ConsumerMetadataResponse

{
  CorrelationId          => int64
  ErrorCode              => int16
  CoordinatorId          => Broker
  GenerationId           => int64
}

JoinGroupRequest

 

{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerAddr           => String
  SessionTimeout         => int64
  Subscribing            => [String]
}

 

JoinGroupResponse

{
  Version                => int16
  CorrelationId          => int64
  ErrorCode              => int16
  Generation             => int16
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [<TopicAndPartition>]
}

 

HeartbeatRequest

{
  Version                => int16
  CorrelationId          => int64
  GroupId                => String
  ConsumerId             => String
}

HeartbeatResponse

{
  CorrelationId          => int64
  ErrorCode              => int16
}
 

Wildcard Subscription Handling

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Rebalance Process: Option 2

For this approach, if a coordinator tries to rebalance, it needs to firstly increment the generation number, update the ZK accordingly, and then close the socket.

Also the consumer group generation number metadata needs to be able to propagated to all brokers from ZK, either through controller or through coordinator. We do not consider ZK directly push this information to brokers upon updates or a pull-based method since neither are scalable.

Then the rebalance process becomes:

1) Upon coordinator failure, fetch the new coordinator id along with the generation number.

2) If the generation number is not bigger than its currently held generation number, send ping request to the coordinator upon connection setup.

3) Otherwise, prepare the rebalance by stop fetching and call onPartitionDeassigned, and then send a join request to the coordinator upon connection setup.

4) Coordinator accepts the offset commit even if the generation number is smaller if it is undergoing rebalance for the group (will talk about offset management in the next section).

 

Consumer's logic can then be summarized as the following state-machine:

-----------------------------------------------------------------------------------------------------------------

Joined (join response received, started fetching and pinging):

  1) Topic Subscription Change  => Joining (prepare for rebalance, send join request)

  2) Coordinator Failed => Discovering (send find_coordinator)

  3) Shutdown Signal => Stopped

Discovering (lost connection with coordinator, but may be fetching data)

  1) Found Coordinator, Generation Number Changed => Joining

  2) Found Coordinator, Generation Number OK => Joined

  3) Shutdown Signal => Stopped

Joining (not fetching, waiting for join response)

  1) Error Join Response => Discovering

  2) Normal Join Response => Joined

  3) Shutdown Signal => Stopped

Stopped (not started, killed, shutdown)

  1) Startup => Discovering

-----------------------------------------------------------------------------------------------------------------

 

 

  • No labels