You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 32 Next »

The following is a draft design that uses a high-available consumer coordinator at the broker side to handle consumer rebalance. By migrating the rebalance logic from the consumer to the coordinator we can resolve the consumer split brain problem and help thinner the consumer client. With this design we hope to incorporates:

KAFKA-167: Move partition assignment to the broker

KAFKA-364: Add ability to disable rebalancing in ZooKeeper consumer

And solves:

KAFKA-242: Subsequent calls of ConsumerConnector.createMessageStreams cause Consumer offset to be incorrect

1. Consumer Scalability Goal

With a single coordinator we should support up to 5K consumers (the bottleneck should be the number of socket channels a single machine can have), and by adding more coordinator (we can enable every broker to be a coordinator, so the maximum of coordinator is the size of the server cluster), the scalability of the consumer will increase linearly.

2. Rebalancing Algorithm Overview

One of the brokers is elected as the coordinator for all the consumer groups. It will be responsible for triggering rebalancing attempts for certain consumer groups on consumer group membership changes or interested topic partition changes. It will also be responsible for communicating the resulting partition-consumer ownership configuration to all consumers of the group under rebalance. More specifically, it will try to do the following:

Detecting newly started consumers. The coordinator will not try to discover new consumers but wait for the consumers to try to connect to it. When a new connection request has been received by the coordinator, it will receive the registration request from the new consumer. If the consumer's config options are valid, it accepts the registration, record the consumer's meta information such as session timeout value and number of streams per topic that the consumer is interested in, and sends back a connection established acknowledgment.

                                    <-- (register) -- Consumer (startup)

Coordinator              <-- (register) -- Consumer (startup)

                                    <-- (register) -- Consumer (startup)

----------------------------------------------------------------------------------------------

                             (record metadata)       - (confirm) -> Consumer (register succeed, block reading for coordinator requests such as ping, stop-fetcher, start-fetcher)

Coordinator       (record metadata)       - (confirm) -> Consumer (same as above)

                                                                    -- (reject) -->    Consumer (register failed due to some invalid meta info, try to reconnect to the coordinator and register again)

Detecting consumer failure. For every registered consumer, the coordinator will check its liveness through periodic heartbeats. The heartbeat response should include the consumed offset per partition if auto-offset-commit is enabled. If the consumer fails to reply to a heartbeat for configured number of times, the coordinator will mark the consumer as failed and trigger the rebalance procedure for the group this consumer belongs to.

Coordinator               -- (ping) -->     Consumer (alive)

                                     - (ping) ->     Consumer (failed)

----------------------------------------------------------------------------------------------

Coordinator          (record offset)      <-- (reply) --    Consumer (alive)

                                (rebalance)         time out...         Consumer (failed)

Detecting changes to topic partitions. The coordinator keeps a watch on all topic partitions registered in a cluster, that are being consumed by at least one consumer group. When a new partition is added (possibly due to a new added broker) or an old partition is deleted (possibly due to a broker failure), it will trigger the rebalance procedure for every group that is consuming this partition.

Initiate rebalance operation and communicate new partition ownership to consumers. When a coordinator initiates a rebalance operation for a consumer group, it will first send the stop-fetcher request to each consumer in the group and wait for all of them to reply with the last partition consumed offsets. Upon receiving the reply, the consumer should have stopped its fetchers. Then it will send a start-fetcher request to each consumer in the group with the newly computed partition assignment along with the starting offset. The coordinator will finish the rebalance by waiting for all the consumers to finish restarting the fetchers and respond. If any reply is not received after the timeout value, the coordinator will retry the rebalance procedure by marking the consumer as failed and sending the stop-fetcher request to everybody else. If the rebalance cannot finish within a number of rebalance retries, the coordinator will log that as a FATAL error and stop rebalancing that consumer group.

Coordinator               -- (stop-fetcher) -->     Consumer (alive)

                                     - (stop-fetcher) ->     Consumer (alive)

----------------------------------------------------------------------------------------------

Coordinator          (record offset)      <-- (reply) --    Consumer (alive)

                                (record offset)      <-- (reply) --    Consumer (alive)

----------------------------------------------------------------------------------------------

Coordinator (compute new assignment)               -- (start-fetcher) -->     Consumer (alive)

                                                                                       - (start-fetcher) ->     Consumer (become failed)

----------------------------------------------------------------------------------------------

Coordinator          <-- (reply) --    Consumer (alive)

(retry rebalance)  time out...        Consumer (failed)

When a coordinator is failed, other peer brokers will notice that and try to re-elect to be the new coordinator. The new coordinator will not try to connect to the consumers but wait for the consumers to realize the previous coordinator has dead and try to re-connect to itself.

The consumer, upon startup, will consult known brokers for the current coordinator. The known broker list is put in the consumer properties file. Then the consumer will try to create a socket channel to the coordinator and register itself through the socket channel; once accepted, it will keep trying to read new requests from the coordinator and respond, but never pro-actively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost, and stop its fetchers and try to reconnect to the possibly new coordinator by restarting the consulting process again. Its workflow logic can be found here.

3. Configuration

Config Options to Brokers/Consumers

The following config options are added to brokers, these options will only be used by the broker when it elects as the coordinator:

  1. Rebalancing threadpool size
  2. Communication threadpool (in socket server) size
  3. Minimum viable value for consumer session timeout
  4. Maximum number of failed requests to consumers

The following config options are added to consumers

  1. Bootstrap broker list
  2. Session timeout

In addition, the following config options are moved from consumer side to broker side

  1. Maximum rebalance retries
  2. Rebalance backoff time (in milliseconds)
Paths Stored in ZooKeeper

Most of the original ZK paths storage are kept, in addition to the coordinator path (stores the current coordinator info):

/consumers/coordinator --> broker_id (ephemeral; created by coordinator)

Besides, some of the original ZK paths are removed, including:

/consumers/groups/ids

And some of the ephemeral ZK paths are changed to persistent:

/consumers/groups/owners

4. On Coordinator Startup

Every server will create an coordinator instance as its member upon startup. The consumer coordinator keeps the following fields (all the request formats and their usage will be introduced later in this page):

  1. A ZK based elector using the coordinator path mentioned above
  2. For each group, a bit indicating if the group is under rebalancing
  3. For each topic, the consumer groups that are interested in the topic
  4. For each consumer, its meta information such as session timeout value, topic count, offset commit option, etc
  5. A list of groups that has wildcard interests for topics
  6. A configurable fixed size list of blocking queue storing all the rebalance requests
  7. A threadpool with each thread handling all the rebalance requests read from one of the rebalance request queue
  8. A socket server for maintaining socket channels with all consumers. It contains a fixed number of processor threads for writing RestartFetcherRequest and read PingResponse and RestartFetcherResponse
  9. A scheduler of thread for sending PingRequest to all the consumers

The elector, upon initialization, will immediately try to elect as the coordinator. If someone else has become the coordinator, it will listen to the coordinator path for data change, and try to re-elect whenever the current elector resigns (i.e. the data on the path is deleted).

Whenever it elects to become the leader, it will trigger the startup procedure of the coordinator described below:

coordinatorStartup :

1. Read all the topics from ZK and initialize the interested groups for each topic and the list of groups with wildcard interests

2. Register listeners for topics and their partition changes

2.1 Subscribe TopicChangeListener to /brokers/topics

2.2 Subscribe TopicPartitionChangeListener to each /brokers/topics/[topic]

3. Start up the socket server and the scheduler for ping requests

4. Register session expiration listener

5. Initialize the threadpool of rebalancing handlers

The scheduler for ping requests is a delayed scheduler, using a priority queue of timestamps to keep track of the outstanding list of PingRequest, whenever it sends a request it needs to initialize an expire watcher for the request.

scheduler.run :

While isRunning

  1. Peek the head consumer from the priority queue

  2. If the consumer.scheduledTime <= current_time() try to send the PingRequest, otherwise sleep for (consumer.scheduledTime - current_time()) and then sends it

  2.1 If the consumer's channel is not held by the socketServer's processor for rebalance, sends the PingRequest and set the timeout watcher for the consumer

  2.2 Otherwise do nothing

  3. Remove the consumer from the head of the queue and put the consumer with consumer.scheduledTime += (consumer.session_timeout_ms * 1/3) back to the queue

When a processor in the socket server sends a RestartFectherRequest to a consumer's channel, it also needs to initializes an expire watcher for the request.

When the processor received the corresponding RestartFetcherResponse or the PingResponse from the channel, it needs to clear the watcher (we will go back and talk about this timeout protocol in more details later in this page).

The timeout watcher mechanism will be implemented using the Purgatory component. For a detailed description of the request expire/satisfy purgatory, please read here.

5. On Topic Partition Changes Detection

When the ZK wachers are fired notifying topic partition changes, the coordinator needs to decide which consumer groups are affected by this change and hence need rebalance.

Handle Topic Change
TopicChangeListener.handleChildChange :

1. Get the newly added topic (since /brokers/topics are persistent nodes, no topics should be deleted even if there is no consumers any more inside the group)

2. For each newly added topic:

2.1 Subscribe TopicPartitionChangeListener to /brokers/topics/topic

2.2 For groups that have wildcard interests, check whether the newly added topic are of their interests; if yes, add the group to the interested group list of this topic

2.3 Get the set of groups that are interested in this topic,  and if the group's rebalance bit is not set, set it and try to rebalance the group

* By saying "try to rebalance the group" we mean create a new rebalance request and send it to the corresponding rebalance thread's queue. The correspondence mapping can be for example hashing based.
Handle Topic Partition Change
TopicPartitionChangeListener.handleChildChange :

Get the set of groups that are interested in this topic, and if the group's rebalance bit is not set, set it and try to rebalance the group

6. On Consumer Startup

Upon creation, the consumer will get a list of

  {brokerId : (host, port)}

as part of the config info from its properties file. It can then consult to any one of the known brokers to get the full list of current brokers and the ID of the coordinator.

Once the consumer finds out the address of the coordinator, it will try to connect to the coordinator. When the connection is set up, it will send the only one RegisterConsumerRequest to the coordinator.

After that, the consumer will keep trying to read new request from the coordinator. Hence the consumer does not need to maintain a socket server.

consumerStartup (initBrokers : Map[Int, (String, String)]):

1. In a round robin fashion, pick a broker in the initBrokers, create a socket channel with that broker

1.1. If the socket channel cannot be created, try the next broker in the initBroker list

1.2. If all the brokers in initBroker cannot be connected, log a AllKnownBrokersNotAvailable error event and go back to the first broker

1.3. The round robin fashioned retrying to connect to the brokers will go forever until the consumer is shut down

2. Send a ClusterMetadataRequest request to the broker and get a ClusterMetadataResponse from the broker

3. From the response update its local memory of the current server cluster info and the id of the current coordinator

4. Set up a socket channel with the current coordinator, send a RegisterConsumerRequest and receive a RegisterConsumerResponse

5. If the RegisterConsumerResponse verifies the registry is accepted keep block-reading from the channel; otherwise go back to step 1
ClusterMetadataRequest

The ClusterMetadataRequest sent by the consumer can be received by any broker, and is handled by the broker's socket server. Hence its request format should be compatible with the ProduceRequest and FetchRequest.

Note that in this design we are still considering the format of 0.7, although new wire format has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.

{
  size: int32                // the size of this request, not including the first 4 bytes for this field
  request_type_id: int16     // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
                             // do not need any data for the cluster metadata request
}
ClusterMetadataResponse
{
  size: int32                                             // the size of this response, not including the first 4 bytes for this field
  error_code: int16                                       // global error code for this request, if any
  coordinator_id: int32                                   // broker id of the coordinator
  num_brokers: int16                                      // number of brokers in the following list
  brokers_info: [<broker_struct>]                         // current broker list in the cluster
}

broker_struct =>
{
  id: int32
  host: string                                            // we adapt the protocol of 0.8 for strings, where the size of the string is stored at the beginning, followed by the bytes
  port: int32
}
RegisterConsumerRequest

The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first Stop or the Ping request within ping_interval_ms.

{
  size: int32                        // the size of this request
  request_type_id: int16             // the type of the request, currently only one type is allowed: ConnectConsumerRequest
  group_id: string                   // group that the consumer belongs to
  consumer_id: string                // consumer id
  topic_count: string                // interested topic and number of streams, can be wildcard
  auto_commit: boolean               // indicator of whether autocommit is enabled
  auto_offset_rest: string           // indicator of what to do if an offset is out of range, currently either smallest or largest
  session_timeout_ms: int32          // session timeout in milliseconds
}
RegisterConsumerResponse

Upon receiving the RegisterConsumerRequest, the coordinator will check the following:

  1. Consumer's ID is new to other consumers within the same group
  2. Consumer's specified session_timeout is larger than its minimum allowed session timeout value

If all the checks are successful, then accepts the registry by storing these meta information of the consumer in main memory and sends a RegisterConsumerReponse with no error message; otherwise encode the error message in the RegisterConsumerReponse and close the connection

{
  size: int32                        // the size of this response
  error_code: int16                  // global error code for this request, if any
                                     // no data needed
}

In addition, whenever a new consumer is registered to the coordinator, the coordinator will issue a rebalance request for the group of the newly added consumer (described in the next subsection). When a new group id is seen by the coordinator (i.e. this group does not exist in coordinator's memory yet), it needs to handle the newly added group.

Handle New Group
handleNewGroup :

1. Read all the topics this group is interested from topic_count, for each topic:

1.1. If the topic already exists in coordinator's memory, update its list by adding this group

1.2. Otherwise create this topic with this group as its only interested group

2. If the interested topics are in the wildcard form, add this group to the list of wildcard-interested groups

3.1. If the group already has some interested existed topics, create its rebalance bit initialized as true, and try to rebalance the group

3.2. Otherwise just create its rebalance bit initialized as false
Handle Added Consumer

Note that here we assume that all the consumers in the same group have the same set of interested topics (although they can have different number of stream counts for each topic), so adding/deleting consumers from the group will not change the group's interested topics. We think it is a fair assumption: though in current implementation consumers within a group can have different interested group, users seldomly do that.

handleAddedConsumer :

1. If the rebalance bit for the group that the consumer belongs to is not set, set it and try to rebalance the group

7. On Consumer Failure Detection

Upon successfully registering a consumer, the coordinator will add the consumer's PingRequest to the ping request scheduler's queue, which will then try to keep track if the consumer is still alive or not.

  1. The scheduler will try to send the PingRequest to the consumer every 1/3 * consumer.session_timeout, and set the timeout watcher waiting for 2/3 * consumer.session_timeout.
  2. Similarly, when the processor threads sends the RestartRequest to the consumer it also expect to receive a response within 2/3 * consumer.session_timeout.
  3. If the watcher for PingRequest expires, the coordinator marks that ping request as failed.
  4. If the PingRequest has failed more than the maximum number of ping retries, the coordinator will marks the consumers as failed.
    1. It removes the consumer's registry information from its memory and close the connection.
    2. It issues a rebalance request for the group of the failed consumer.
Handle Consumer Failure
handleConsumerFailure :

1. Decide if the group still has some consumers; if not, do nothing.

2. If the rebalance bit is not set, set it and try to rebalance the group.

8. On Rebalancing

The rebalancing threads keep block-reading from their corresponding rebalance request queue. Hence each handles the rebalancing tasks for a non-overlapping subset of groups (e.g. through hashing functions).

For each rebalance request for a specific group it triggers the following rebalance logic:

rebalance (group) :

1. Get the topics that are interested by the group. For each topic:

1.1. Get the number of partitions by reading from ZK

1.2. Get the number of threads for each topic from the meta information of consumers in the group it kept in memory

1.3. Compute the new ownership assignment for the topic

3. Check if a rebalance is necessary by trying to get the current ownership from ZK for each topic.

3.1 If there is no registered ownership info in ZK, rebalance is necessary

3.2 If some partitions are not owned by any threads, rebalance is necessary

3.3 If some partitions registered in the ownership map do not exist any longer, rebalance is necessary

3.4 If ownership map do not match with the newly computed one, rebalance is necessary

3.5 Otherwise rebalance is not necessary

4. If rebalance is necessary, do the following:

4.1 For each consumer in the group, send the RestartFetcherRequest to the socket server's corresponding processor's queue (through some hashing functions to decide which queue to use)

4.2 Then wait until socket server has reported that all the RestartFetcherReponse have been received or a timeout has expired

5. If a timeout signal is received from the socket server, return false; otherwise write the new ownership info to ZK and return true.

If the rebalance succeeds it will reset the rebalance bit of the group to false; otherwise it will retry the above rebalance logic again.

If the handler cannot finish rebalance successfully with config.maxRebalanceRetries retries, it will throw a ConsumerRebalanceFailedException to the log.

9. On Coordinator Failover

Whenever the current coordinator's hosted server dies, other coordinator's elector will realize that through the ZK listener and will try to re-elect to be the leader, and whoever wins will trigger the coordinator startup procedure.

When the dead server comes back, it's elector will atomically reconnect to ZK and trigger the handleNewSession function:

handleNewSession :

1. Reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc

2. Re-register the session expiration listener (this is because ZkClient does not re-register itself once fired)

3. Try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.

As for consumers, if a consumer has not heard any request (either Ping or RestartFetcher) from the coordinator for 2 * session_timeout, it will suspects that the coordinator is dead. Then it will stops its fetcher, close the connection and re-execute the startup procedure, trying to re-discover the new coordinator.

10. On Consumer Handling Coordinator Request

There are three types of requests that a consumer can receive from the coordinator: PingRequest and RestartFetcherRequest. For each type of requests the consumer is expected to response within 2/3 * session_timeout.

PingRequest

Coordinator use PingRequest to check if the consumer is still alive, and consumer need to response with the current offset for each consuming partition if auto_commit is set.

{
  size: int32                     // the size of this request
  request_type_id: int16          // the type of the request, distinguish Ping/RestartFetcher requests
  consumer_id: string             // id string of the consumer
  num_retries: int32              // indicate this is the #th ping retry
}
PingResponse
{
  size: int32                     // the size of this response
  num_partition: int16            // number of partitions that the consumer is consuming
  offset_info: [<offset_struct>]  // the offset info for each partition consumed by this consumer, its size should be exactly num_partition
                                  // if auto_commit is set to false the last two fields should not exist
}

offset_struct =>
{
  topic: string
  partition: string
  offset: int64
}
Handling PingRequest
handlePingRequest (request: PingRequest):

1.1. If auto_commit is not enabled, just sends a PingResponse with no num_partition and offset_info

1.2. Otherwise read topicRegistry and sends a PingResponse with the created num_partitions and offset_info
Handling PingResponse
handlePingResponse (response: PingResponse):

1. Get the offset information for each consumed partition, and write them to ZK
StopFetcherRequest

When the coordinator decides to rebalance a group,it will send StartFetcherRequest to every consumer in the group to let them stop their fetchers.

{
  size: int32                              // the size of this request
  request_type_id: int16                   // the type of the request, distinguish Ping/RestartFetcher requests
  consumer_id: string                      // id string of the consumer, this is only used by the purgatory
}
Handling StopFetcherRequesthandle
RestartFetcherRequest (request: RestartFetcherRequest):

1. Close all fetchers and clean their corresponding queues

2. Send the response along with the last partition consumed offset info
StopFetcherResponse
{
  size: int32                     // the size of this response
  error_code: int16               // global error code for this request, if any
  num_partition: int16            // number of partitions that the consumer is consuming
  offset_info: [<offset_struct>]  // the offset info for each partition consumed by this consumer, its size should be exactly num_partition
                                  // if auto_commit is set to false the last two fields should not exist
}
StartFetcherRequest

When the coordinator have received the StopFetcherResponse from all the consumers, it will compute the new assignments and send it in the StartFetcherRequest to the consumers.

{
  size: int32                              // the size of this request
  request_type_id: int16                   // the type of the request, distinguish Ping/RestartFetcher requests
  consumer_id: string                      // id string of the consumer
  num_partitions: int16                    // the number of partitions assigned to the consumer
  assigned_parts: [<partition_struct>]     // the detailed info of assigned partitions, along with the starting offset
}

partition_struct =>
{
  topic: string
  partition: string                        // note that this partition string already contains the broker id
  offset: int64
}
Handling StartFetcherRequest
handleStartFetcherRequest (request: RestartFetcherRequest):

1. If the consumer's topic interests are wildcard, re-construct topicThreadIdAndQueues and KafkaMessageAndMetadataStreams

2. Read the assigned partitions along with offsets from assigned_parts

3. Update topic registry with the newly assigned partition map

4. Start fetchers with the new topic registry

5. Send back the RestartFetcherResponse with the old topic registry information
StartFetcherResponse
{
  size: int32                     // the size of this response
  error_code: int16               // global error code for this request, if any
}
Handling StartFetcherResponse
handleStartRequest (response: RestartFetcherResponse):

1. Get the offset information for each consumed partition, and write them to ZK

11. Open Problems

  1. When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
  2. The rebalance thread pool and the socketServer's processor thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
  3. Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process
  4. On consumer handling RestartFectherRequest, if it fails before sending back the previously consumed partitions' offsets, then duplicate messages will be consumed.

12. Possible Improvements

  1. Batch Updating ZK Offset: when received the PingResponse with the current offset info, the processor does not need to immediately reflect that to ZK but just update an in-memory {(topic, partition) -> offset} map; then another daemon thread can periodically reflect the updated map to ZK. With newer version of ZK that supports group writes, it can reduce the number of writes to ZK.
  2. Multiple Coordinator: In the future, it might be possible that we need each broker to act as a coordinator, with consumer groups evenly distributed amongst all the consumer co-ordinators. This will enable supporting >5K consumers since right now the bottleneck is really the number of open socket connections to the single co-ordinator machine.
  • No labels