Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The following is a draft design that uses a high-available consumer coordinator at the broker side to handle consumer rebalance. By migrating the rebalance logic from the consumer to the coordinator we can resolve the consumer split brain problem and help thinner the consumer client. With this design we hope to incorporates:

KAFKA-167: Move partition assignment to the broker

KAFKA-364: Add ability to disable rebalancing in ZooKeeper consumer

And solves:

KAFKA-242: Subsequent calls of ConsumerConnector.createMessageStreams cause Consumer offset to be incorrect

1. Consumer Scalability Goal

With a single coordinator we should support up to 5K consumers (the bottleneck should be the number of socket channels a single machine can have), and by adding more coordinator (we can enable every broker to be a coordinator, so the maximum of coordinator is the size of the server cluster), the scalability of the consumer will increase linearly.

2. Rebalancing Algorithm Overview

One of the brokers is elected as the coordinator for all the consumer groups. It will be responsible for:

...

The consumer, upon startup, will consult known brokers for the current coordinator. The known broker list is put in the consumer properties file. Then the consumer will try to create a socket channel to the coordinator and register itself through the socket channel; once accepted, it will keep trying to read new requests from the coordinator and respond, but never pro-actively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost, and stop its fetchers and try to reconnect to the possibly new coordinator by restarting the consulting process again.

...

3. Configuration

Config Options to Brokers/Consumers

...

Code Block
/consumers/groups/owners

...

4. On Coordinator Startup

Every server will create an coordinator instance as its member upon startup. The consumer coordinator keeps the following fields (all the request formats and their usage will be introduced later in this page):

...

The timeout watcher mechanism will be implemented using the Purgatory component. For a detailed description of the request expire/satisfy purgatory, please read here.

...

5. On Topic Partition Changes Detection

When the ZK wachers are fired notifying topic partition changes, the coordinator needs to decide which consumer groups are affected by this change and hence need rebalance.

...

Code Block
TopicPartitionChangeListener.handleChildChange :

Get the set of groups that are interested in this topic, and if the group's rebalance bit is not set, set it and try to rebalance the group

...

6. On Consumer Startup

Upon creation, the consumer will get a list of

...

Code Block
handleAddedConsumer :

1. If the rebalance bit for the group that the consumer belongs to is not set, set it and try to rebalance the group

...

7. On Consumer Failure Detection

Upon successfully registering a consumer, the coordinator will add the consumer's PingRequest to the ping request scheduler's queue, which will then try to keep track if the consumer is still alive or not.

...

Code Block
handleConsumerFailure :

1. Decide if the group still has some consumers; if not, do nothing.

2. If the rebalance bit is not set, set it and try to rebalance the group.

...

8. On Rebalancing

The rebalancing threads keep block-reading from their corresponding rebalance request queue. Hence each handles the rebalancing tasks for a non-overlapping subset of groups (e.g. through hashing functions).

...

If the handler cannot finish rebalance successfully with config.maxRebalanceRetries retries, it will throw a ConsumerRebalanceFailedException to the log.

...

9. On Coordinator Failover

Whenever the current coordinator's hosted server dies, other coordinator's elector will realize that through the ZK listener and will try to re-elect to be the leader, and whoever wins will trigger the coordinator startup procedure.

...

As for consumers, if a consumer has not heard any request (either Ping or RestartFetcher) from the coordinator for 2 * session_timeout, it will suspects that the coordinator is dead. Then it will stops its fetcher, close the connection and re-execute the startup procedure, trying to re-discover the new coordinator.

...

10. On Consumer Handling Coordinator Request

There are three types of requests that a consumer can receive from the coordinator: PingRequest and RestartFetcherRequest. For each type of requests the consumer is expected to response within 2/3 * session_timeout.

...

Code Block
handleStartRequest (response: RestartFetcherResponse):

1. Get the offset information for each consumed partition, and write them to ZK

11. Open Problems

  1. When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
  2. The rebalance thread pool and the socketServer's processor thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
  3. Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process
  4. On consumer handling RestartFectherRequest, if it fails before sending back the previously consumed partitions' offsets, then duplicate messages will be consumed.

12. Possible Improvements

  1. Batch Updating ZK Offset: when received the PingResponse with the current offset info, the processor does not need to immediately reflect that to ZK but just update an in-memory {(topic, partition) -> offset} map; then another daemon thread can periodically reflect the updated map to ZK. With newer version of ZK that supports group writes, it can reduce the number of writes to ZK.
  2. Multiple Coordinator: In the future, it might be possible that we need each broker to act as a coordinator, with consumer groups evenly distributed amongst all the consumer co-ordinators. This will enable supporting >5K consumers since right now the bottleneck is really the number of open socket connections to the single co-ordinator machine.