Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The following is a draft design that uses a high-available consumer coordinator at the broker side to handle consumer rebalance. By migrating the rebalance logic from the consumer to the coordinator we can resolve the consumer split brain problem and help thinner the consumer client.

...

1. Consumer Scalability Goal

With a single coordinator we should support up to 5K consumers (the bottleneck should be the number of socket channels a single machine can have), and by adding more coordinator (we can enable every broker to be a coordinator, so the maximum of coordinator is the size of the server cluster), the scalability of the consumer will increase linearly

Rebalancing Algorithm Overview

One of the brokers is elected as the coordinator for all the consumer groups. It will be responsible for:

  1. Detecting newly started consumers
  2. Detecting consumer failure through custom RPC protocol (session timeout)
  3. Detecting changes to topic partitions through ZK watchers
  4. Triggering rebalancing attempts according to consumer/partition changes described above and communicate
  5. Watch for new topics and topic partition changes in Zookeeper
  6. Accept and maintain a socket channel request from newly added consumers
  7. Watch for consumer failure by periodic pinging them via the socket channels
  8. Rebalance for affected groups in response to topic partition (through ZK watchers) and group consumer (through ping request) changes
  9. Communicate the rebalance results to consumers through the socket channels

Upon receiving a new socket channel from a new consumer, the coordinator will verify its register request info, and respond whether the registry is accepted or not. If the new consumer's registry is accepted and kept in memory, the coordinator will keep pinging it through the socket channel to keep track whether it is still alive.

When a When a coordinator decides a rebalance is needed for certain group, it will first sends the stoprestart-fetcher command to each consumers in the group , and then sends the start-fetcher command to each consumers with the assigned partitionsnewly computed assignment. Each consumers will only receive the partition-info of the partitions that are assigned to itself. The coordinator will finish the rebalance by waiting for all the consumers to finish starting restarting the fetchers and respond.

The consumer, upon startup, will consult known brokers for the current coordinator. The known broker list is put in the consumer properties file. Then the consumer will try to create a socket channel to the coordinator , and register itself through the socket channel; once accepted, it will keep trying to read new requests from the coordinator and respond, but never pro-actively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost and try to reconnect to the possibly new coordinator by restarting the consulting process again.request within a configurable amount of time, it will treat it as the connection has lost, and stop its fetchers and try to reconnect to the possibly new coordinator by restarting the consulting process again.

2. Configuration

Config Options to Brokers/Consumers

The following config options are added to brokers, these options will only be used by the broker when it elects as the coordinator:

  1. Rebalancing threadpool size
  2. Communication threadpool (in socket server) size
  3. Minimum viable value for consumer session timeout

The following config options are added to consumers

  1. Bootstrap broker list
  2. Session timeout

In addition, the following config options are moved from consumer side to broker side

  1. Maximum rebalance retries
  2. Rebalance backoff time (in milliseconds)
Paths Stored in ZooKeeper

Most of the original ZK paths storage are kept, in addition to the coordinator path (stores the current coordinator info):

...

Code Block
/consumers/groups/owners

3. On Coordinator Startup

Every server will create an coordinator instance as its member upon startup. The consumer coordinator keeps the following fields (all the request usage and formats will be introduced later in this page):

...

Code Block
KafkaDelayedScheduler.run() :

While isRunning

  1. Peek the head consumer from the priority queue

  2. If the consumer.scheduledTime >= current_time() try to send the PingRequest

  2.1 If the consumer's channel is not held by the socketServer's processor for rebalance, sends the PingRequest and set the timeout watcher for the consumer

  2.2 Otherwise do nothing

  3. Remove the consumer from the head of the queue and put the consumer with consumer.scheduledTime += consumer.ping_interval_ms back to the queue

4. On

...

Topic Partition Changes Detection

When the ZK wachers are fired notifying topic partition changes, the coordinator needs to decide which consumer groups are affected by this change and hence need rebalance.

...

Code Block
TopicPartitionChangeListener.handleChildChange :

Get the set of groups that are interested in this topic from consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group

5. On Consumer Startup

Upon creation, the consumer will get a list of

...

Code Block
consumerStartup (initBrokers : Map[Int, (String, String)]):

1. Randomly pick a broker in the initBrokers, create a socket channel with that broker

1.1. If the socket channel cannot be created, try another broker in the initBroker

1.2. If all the brokers in initBroker cannot be connected, throw a AllKnownBrokersNotAvailable exception and return

2. Send a ConsultRequest request to the broker and get a ConsultResponse from the broker

3. From the ConsultResponse update serverCluster and curCoordinator

4. Set up a socket channel with the current coordinator and send a RegisterRequest

5. Keep block-reading from the channel

...

ClusterMetadataRequest

The ConsultRequest sent by the consumer can be received by any broker, and is handled by the broker's socketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.

...

Code Block
{
  size: int32                // the size of this request, not including the first 4 bytes for this field
  request_type_id: int16     // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
                             // do not need any data for the consult request
}

...

ClusterMetadataResponse
Code Block
{
  size: int32                                             // the size of this response, not including the first 4 bytes for this field
  error_code: int16                                       // global error code for this request, if any
  coordinator_id: int16                                   // broker id of the coordinator
  brokers_info: [<broker_struct>]                         // current broker list
}

broker_struct =>
{
  creator_id: string
  id: int32
  host: string
  port: int32
}

...

RegisterConsumerRequest

The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first Stop or the Ping request within ping_interval_ms.

Code Block
{
  size: int32                        // the size of this request
  request_type_id: int16             // the type of the request, currently only one type is allowed: ConnectRequest
  group_id: string                   // group that the consumer belongs to
  consumer_id: string                // consumer id
  topic_count: string                // interested topic and number of streams, can be wildcard
  auto_commit: boolean               // indicator of whether autocommit is enabled
  auto_offset_rest: string           // indicator of what to do if an offset is out of range, currently either smallest or largest
  ping_interval_ms: int32            // ping interval in milliseconds, and the consumer is expected to response within the interval also
  max_ping_retries: int16            // maximum number of allowed ping timeouts before the consumer to be treated as failed
}
RegisterConsumerResponse

6. On New Consumer

...

/

...

Consumer Failure Detection

When a new connection is received from a consumer by the socketServer of the coordinator, it will wait for its RegisterRequest. Once the request is received, the coordinator will issue a rebalance request for the group of the newly added consumer. When a new group id is seen by the coordinator (hence this group does not exist yet), it needs to handle the newly added group.

...

Code Block
handleGroupMemberChange :

1. Decide the deleted consumer or the newly added consumer

2.1. If the group no longer contain any consumer, do nothing

2.2. Otherwise if groupsBeingRebalanced(group).compareAndSet(false, true) succeeds, put the group to rebalanceRequestQ[group % requestHandler.size].

7. On Rebalancing

The requestHandler threads keep block-reading from their corresponding rebalanceRequestQ, each will handle the rebalancing task for a non-overlapping subset of groups (e.g. through mod hashing).

...

Code Block
rebalance (group) :

1. Get the topics that are interested by the group. For each topic:

1.1. Get the number of partitions by reading from ZK

1.2. Get the number of threads for each topic from the alive consumers connecting to it

1.3. Compute the new ownership assignment for the topic

3. Check if a rebalance is necessary by trying to get the current ownership from ZK for each topic.

3.1 If there is no registered ownership info in ZK, rebalance is necessary

3.2 If some partitions are not owned by any threads, rebalance is necessary

3.3 If some partitions registered in the ownership map do not exist any longer, rebalance is necessary

3.4 If ownership map do not match with the new one, rebalance is necessary

3.5 Otherwise rebalance is not necessary

4. If rebalance is necessary, do the following:

4.1 For each consumer in the group, send the StopRequest to the socketServer's corresponding processor's queue

4.2 Then for each consumer in the group, send the StartRequest with he newly computed ownership specific to the consumer to the socketServer's corresponding processor's queue

4.3 Then wait until socketServer has reported that all the StartReponse have been received

5. If waiting has timed out, return false; otherwise write the new ownership info to ZK and return true.

8. On Coordinator Failover

Whenever the current coordinator's hosted server dies, other coordinator's elector will realize that through the ZK listener and will try to re-elect to be the leader, and whoever wins will trigger the callback function coordinatorStartup.

...

Code Block
handleNewSession :

1. Reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc

2. Re-register the session expiration listener (this is because ZkClient does not re-register itself once fired)

3. Try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.

9. On Consumer Handling Coordinator Request

There are three types of requests that a consumer can receive from the coordinator: PingRequest, StopRequest and StartRequest. For each type of requests the consumer is expected to response within ping_interval_ms

...

Code Block
handlePingRequest (request: PingRequest):

1.1. If auto_commit is not enabled, just sends a PingResponse with no num_partition and offset_info

1.2. Otherwise read topicRegistry and sends a PingResponse with the created num_partitions and offset_info

...

RestartFetcherRequest

When the coordinator decides to rebalance a group,it will first send StopRequest to every consumer in the group to let them stop consuming.

...

Code Block
{
  size: int32                              // the size of this request
  request_type_id: int16                   // the type of the request, distinguish Ping/Stop/Start requests
  num_partitions: int16                    // the number of partitions assigned to the consumer
  assigned_parts: [<partition_struct>]     // the detailed info of assigned partitions, along with the starting offset
}

partition_struct =>
{
  topic: string
  partition: string                        // note that this partition string already contains the broker id
  offset: int64
}

...

RestartFetcherResponse
Code Block
{
  size: int32                     // the size of this response
  error_code: int16               // global error code for this request, if any
}

...

  1. Batch Updating ZK Offset: when received the PingResponse with the current offset info, the processor does not need to immediately reflect that to ZK but just update an in-memory {(topic, partition) -> offset} map; then another daemon thread can periodically reflect the updated map to ZK. With newer version of ZK that supports group writes, it can reduce the number of writes to ZK.
  2. Multiple Coordinator: In the future, it might be possible that we need each broker to act as a coordinator, with consumer groups evenly distributed amongst all the consumer co-ordinators. This will enable supporting >5K consumers since right now the bottleneck is really the number of open socket connections to the single co-ordinator machine.