Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

WARN: This is an obsolete design. The design that's implemented in Kafka 0.9.0 is described in this wiki.

The following is a draft design that uses a high-available consumer coordinator at the broker side to handle consumer rebalance. By migrating the rebalance logic from the consumer to the coordinator we can resolve the consumer split brain problem and help thinner the consumer client. With this design we hope to incorporates:

...

  1. On startup or on co-ordinator failover, the consumer sends a ClusterMetadataRequest to any of the brokers in the "bootstrap.brokers" list. In the ClusterMetadataResponse, it receives the location of the co-ordinator for it's group.  
  2. The consumer sends a RegisterConsumer request to it's co-ordinator broker. In the RegisterConsumerResponse, it receives the list of topic partitions that it should own.
  3. At this time, group management is done and the consumer starts fetching data and (optionally) committing offsets for the list of partitions it owns.

Co-ordinator

Detecting newly started consumers

The coordinator will not try to discover new consumers but wait for the consumers to try to connect to it. When a new connection request has been received by the coordinator, it will receive the registration request from the new consumer. If the consumer's config options are valid, it accepts the registration, record the consumer's meta information such as session timeout value and number of streams per topic that the consumer is interested in, and sends back a connection established acknowledgment.

                                    <-- (register) -- Consumer (startup)

Coordinator              <-- (register) -- Consumer (startup)

                                    <-- (register) -- Consumer (startup)

----------------------------------------------------------------------------------------------

                             (record metadata)       - (confirm) -> Consumer (register succeed, block reading for coordinator requests such as ping, stop-fetcher, start-fetcher)

Coordinator       (record metadata)       - (confirm) -> Consumer (same as above)

                                                                    -- (reject) -->    Consumer (register failed due to some invalid meta info, try to reconnect to the coordinator and register again)

Detecting consumer failure

For every registered consumer, the coordinator will check its liveness through periodic heartbeats. The heartbeat response should include the consumed offset per partition if auto-offset-commit is enabled. If the consumer fails to reply to a heartbeat for configured number of times, the coordinator will mark the consumer as failed and trigger the rebalance procedure for the group this consumer belongs to.

Coordinator               -- (ping) -->     Consumer (alive)

                                     - (ping) ->     Consumer (failed)

----------------------------------------------------------------------------------------------

Coordinator          (record offset)      <-- (reply) --    Consumer (alive)

                                (rebalance)         time out...         Consumer (failed)

Detecting changes to topic partitions

The coordinator keeps a watch on all topic partitions registered in a cluster, that are being consumed by at least one consumer group. When a new partition is added (possibly due to a new added broker) or an old partition is deleted (possibly due to a broker failure), it will trigger the rebalance procedure for every group that is consuming this partition.

Initiate rebalance operation and communicate new partition ownership to consumers

When a coordinator initiates a rebalance operation for a consumer group, it will first send the stop-fetcher request to each consumer in the group and wait for all of them to reply with the last partition consumed offsets. Upon receiving the reply, the consumer should have stopped its fetchers. Then it will send a start-fetcher request to each consumer in the group with the newly computed partition assignment along with the starting offset. The coordinator will finish the rebalance by waiting for all the consumers to finish restarting the fetchers and respond. If any reply is not received after the timeout value, the coordinator will retry the rebalance operation. If the rebalance cannot finish within a number of rebalance retries, the coordinator will log that as a FATAL error and stop rebalancing that consumer group.

Coordinator               -- (stop-fetcher) -->     Consumer (alive)

                                     - (stop-fetcher) ->     Consumer (alive)

----------------------------------------------------------------------------------------------

Coordinator          (record offset)      <-- (reply) --    Consumer (alive)

                                (record offset)      <-- (reply) --    Consumer (alive)

----------------------------------------------------------------------------------------------

Coordinator (compute new assignment)               -- (start-fetcher) -->     Consumer (alive)

                                                                                       - (start-fetcher) ->     Consumer (become failed)

----------------------------------------------------------------------------------------------

Coordinator          <-- (reply) --    Consumer (alive)

(retry rebalance)  time out...        Consumer (failed)

When a coordinator dies, other peer brokers will notice that and try to re-elect a new coordinator. The new coordinator will not try to connect to the consumers but wait for the consumers to realize the previous coordinator is dead and try to connect to the new one.

The consumer, upon startup, will query the brokers in the cluster for the current coordinator. The broker list is specified in the consumer properties file. Then the consumer will try to establish a socket channel to the coordinator and register itself through the socket channel; once accepted, it will keep trying to read new requests from the coordinator and respond, but never pro-actively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost, and stop its fetchers and try to reconnect to the possibly new coordinator by restarting the consulting process again. Its workflow logic can be found here. The main motivation behind having the consumer not send heartbeats to the co-ordinator is to keep the consumer design simple. Implementing blocking socket channels is significantly simpler than implementing a full-fledged socket server using non-blocking socket channels. This goes back to making it easier to rewrite the consumer in other languages.

...

  1. Upon election or startup, the co-ordinator reads the list of groups it manages and their membership information from zookeeper. If there is no previous group membership information, it does nothing until the first consumer in some group registers with it.
  2. Upon election, if the co-ordinator finds previous group membership information in zookeeper, it waits for the consumers in each of the groups to re-register with it. Once all known and alive consumers register with the co-ordinator, it marks the rebalance completed. 
  3. Upon election or startup, the co-ordinator also starts failure detection for all consumers in a group. Consumers that are marked as dead by the co-ordinator's failure detection protocol are removed from the group and the co-ordinator marks the rebalance for a group completed by communicating the new partition ownership to the remaining consumers in the group.
  4. In steady state, the co-ordinator tracks the health of each consumer in every group through it's failure detection protocol.
  5. If the co-ordinator marks a consumer as dead, it triggers a rebalance operation for the remaining consumers in the group. Rebalance is triggered by killing the socket connection to the remaining consumers in the group. Consumers notice the broken socket connection and trigger a co-ordinator discovery process (#1, #2 above). Once all alive consumers re-register with the co-ordinator, it communicates the new partition ownership to each of the consumers in the RegisterConsumerResponse, thereby completing the rebalance operation.
  6. The co-ordinator tracks the changes to topic partition changes for all topics that any consumer group has registered interest for. If it detects a new partition for any topic, it triggers a rebalance operation (as described in #5 above). It is currently not possible to reduce the number of partitions for a topic. The creation of new topics can also trigger a rebalance operation as consumers can register for topics before they are created.

3. Failure detection protocol

 

4. Configuration

Config Options to Brokers/Consumers

...

Code Block
/consumers/groups/owners

...

5. On Coordinator Startup

Every server will create a coordinator object as its member upon startup. The consumer coordinator keeps the following fields (all the request formats and their usage will be introduced later in this page):

...

The timeout watcher mechanism will be implemented using the Purgatory component. For a detailed description of the request expire/satisfy purgatory, please read here.

...

6. On Topic Partition Changes Detection

When the ZK wachers are fired notifying topic partition changes, the coordinator needs to decide which consumer groups are affected by this change and hence need rebalancing.

...

Code Block
TopicPartitionChangeListener.handleChildChange :

1. Get the set of groups that are interested in this topic, and if the group's rebalance bit is not set, set it and try to rebalance the group

...

7. On Consumer Startup

Upon creation, the consumer will get a list of

...

Code Block
handleAddedConsumer :

1. Add the ping request task for this consumer to the scheduler.

2. Add the consumer registry to the group registry's member consumer registry list.

3. Read the subscribed topic/count of this consumer.

3.1 If it is a wildcard topic/count subscription, then add its group to the list of groups with wildcard topic subscriptions.

3.2 For each topic that it subscribed to, add its group to the list of groups that has subscribed to the topic.

4. Try to rebalance the group

...

8. On Consumer Failure Detection

Upon successfully registering a consumer, the coordinator will add the consumer's PingRequest to the ping request scheduler's queue, which will then try to keep track if the consumer is still alive or not.

...

Code Block
handleConsumerFailure :

1. Remove its ping request task from the scheduler.

2. Close its corresponding socket from the broker's SocketServer.

3. Remove its registry from its group's member consumer registry list.

4. If there is an ongoing rebalance procedure by some of the rebalance handlers, let it fail directly.

5. If the group still has some member consumers, try to rebalance the group.

...

9. On Rebalancing

The rebalancing threads keep block-reading from their corresponding rebalance request queue. Hence each handles the rebalancing tasks for a non-overlapping subset of groups (e.g. through hashing functions).

...

If the handler cannot finish rebalance successfully with config.maxRebalanceRetries retries, it will throw a ConsumerRebalanceFailedException to the log.

...

10. On Coordinator Failover

Whenever the current coordinator's hosted server dies, other coordinator's elector will realize that through the ZK listener and will try to re-elect to be the leader, and whoever wins will trigger the coordinator startup procedure.

...

As for consumers, if a consumer has not heard any request (either Ping or Stop/StartFetcher) from the coordinator for session_timeout, it will suspects that the coordinator is dead. Then it will stops its fetcher, close the connection and re-execute the startup procedure, trying to re-discover the new coordinator.

...

11. On Consumer Handling Coordinator Request

There are three types of requests that a consumer can receive from the coordinator: PingRequest, StopFetcherRequest and StartFetcherRequest. For each type of request, the consumer is expected to respond within 2/3 * session_timeout.

...

Code Block
handleStartRequest (response: StartFetcherResponse):

1. Clear the corresponding expiration watcher in StartFetcherRequestPurgatory, and if all the watchers have been cleared notify the rebalance handler.

...

12. Implementation

...

12.1 kafka.api

The following classes are used to describe the request/registry format described in Section 6 and Section 10:

...

  • All the above requests' request ids are added to RequestKeys.scala

...

12.2 kafka.cluster

Broker and Cluster add writeTo(buffer) and sizeInBytes functions to be used in ClusterInfoSend.

...

12.3 kafka.common

Add some more error code to ErrorMapping.

...

12.4 kafka.network

  • SocketServer: hacked to be able to recognize coordinator-specific requests, which would need coordinator's metadata to handle; also hacked to be able to pro-actively send messages by assuming no partial read/write is under going (which is valid for the coordinator's usage). In the future this hacking needs to be removed when we redesign SocketServer after 0.8 for interleaving reads/writes.
  • NonBlockingChannel:

...

12.5 kafka.utils

ZkElection: election module based on Zookeeper.

Also some utility functions are added to Utils and ZkUtils.

...

12.6 kafka.consumer

  • OffsetInfoSend: offset information format wrapper, match with for HeartbeatResponse and StopFetcherResponse.

...

  • CoordinatorRequestHandler: it contains the handler function to handle different types of requests from the coordinator by the ConsumerCoordinatorConnector. Its logic is described in Section 10.

...

12.7 kafka.server

  • KafkaServer contains a new ConsumerCoordinator instance, which will be initialized upon server startup if ZookeeperEnabled is true.
  • ConsumerCoordinator contains the following members and metadata structures (just following Section 4):
    • A ZKClient that is different from the one that KafkaServer used.
    • ZkElection: a Zookeeper based election module, which will trigger the startup call back procedure of the ConsumerCoordinator upon success election. Hence at the same time period only one ConsumerCoordinator of the servers will be "active".
    • Array[RebalanceRequestHandler]: a list of rebalance handler threads which is used for processing the rebalancing tasks for groups, each has a BlockingQueue[String] storing assigned rebalance tasks.
    • A KafkaScheduler heartbeat request scheduling thread which periodically sends heartbeat request to all consumers (frequency based on consumer's session timeout value) that is registered to this coordinator.
    • A KafkaScheduler offset committing thread which periodically writes the current consumed offset for each group to Zookeeper using the coordinator's ZKClient.
    • A HeartbeatRequestPurgatory storing expiration watchers for sent HeartbeatRequest (in implementation we rename "ping" to "heartbeat")
    • A StopFetcherRequestPurgatory and a StartFetcherRequestPurgatory storing expiration watchers for Stop/StartFetcherRequestPurgatory.
    • consumerGroupsPerTopic: a list of groups which have subscribed to each topic.
    • groupsWithWildcardTopics: a list of groups which have at least one consumer that have wildcard topic subscription.
    • groupRegistries:  a list of group registries, each contains the group metadata for rebalancing usage and its member consumers' registered metadata.

and the main API functions include:

    • startup: the startup logic described in Section 4.
    • shutdown: called by the KafkaServer upon shutting down, will first close the elector, shutdown the heartbeat request scheduler, offset committer and rebalance handlers.
    • addRebalanceRequest: assign one rebalance task for a specific group to one of the rebalance handler's queue.
      • If the group has been assigned to a rebalance handler and have not been finished rebalancing (i.e, currRebalancer != None), enforce assign this task also to this handler to avoid concurrent rebalancing of the same group.
      • Otherwise choose the rebalance handler in a round-robin fashion for load balance.
    • sendHeartbeatToConsumer: called by the heartbeat request scheduler, used to send the HeartbeatRequest to the consumer if 1) the previous request has been responded and 2) the consumer still exist in the group's registry
    • handleHeartbeatRequestExpired: increment the number of failed response in the consumer's registry, and if it has exceeded the maximum allowed failures, mark the consumer as failed and trigger handleConsumerFailure
    • handleNewGroup: logic described in Section 6
    • handleNewGroupMember: logic of handle new consumer described in Section 6
    • handleConsumerFailure: logic described in Section 7
    • A list of utility functions used to access the group registry data outside ConsumerCoordinator, which is synchronized to avoid read/write conflicts.
  • RebalanceRequestHandler: logic of the rebalance handler thread:
    • run: while not shutting down, keep trying to get the next rebalance task from its queue, clear the group's rebalanceInitiated bit and process the rebalance logic (described in Section 8).
      • Before each rebalance attempt, first reset the rebalanceFailed bit in the group's registry, which could be set by other thread during the rebalance procedure.
      • Before each rebalance attempt, read the current group's consumer registry list snapshot, which will not change during the rebalance procedure even if its underlying registry data has changed.
      • Upon successfully complete the rebalance, reset the currRebalancer to None, indicating that no one is now assigned to this group for rebalancing.
    • sendStopFetcherRequest: if the consumer's channel has closed while trying to send the request, immediately mark this attempt as failed.
    • sendStartFetcherRequest: read the offset from Zookeeper before constructing the request, and if the consumer's channel has closed while trying to send the request, immediately mark this attempt as failed.
    • isRebalanceNecessary: compare the new assignment with the current assignment written in Zookeeper to decide if rebalancing is necessary.
    • waitForAllResponses: waiting for all Stop/StartFetcherResponse to received or timeout.

...

  • StopFetcherRequestPurgatory: inherted from RequestPurgatory, on expire call the handleStopOrStartFetcherRequestExpired function of ConsumerCoordinator. In 0.8 branch with RequestOrReponse class these two can be combined into one.

...

13. Open Issues / Future work

Implementation
  • Active CommitOffset on Consumers: Currently consumers can no longer actively call commitOffset() to commit their current consumed offsets to Zookeeper, but can only wait for HeartbeatRequests to respond with their offsets. In order to support this the consumers must be not only able to read requests from the coordinator channel but also write requests to the channel. In addition, the SocketServer that the coordinator depends on for RPC communication must also be able to concurrently read to and write from channels.

...

  • Incorporating with 0.8 branch: There are still several implementation details that are based on 0.7, which needs to be incorporated with 0.8 after 0.8 is stable (these are marked as TODOs in the code):
    • Combine Request and Response into RequestOrResponse: this can allow StartFetcherPurgatory and StopFetcherPurgatory to be combined also.
    • Request format protocol: in 0.8 we need to add some more fields such as versionId, correlationId and ClientId to the requests.
    • Refactor RequestHandlers: in 0.8 the RequestHandlers are in kafka.api with SocketServer enhanced with request and response queues. Currently we hacked the RequestHandlers to treat coordinator request/response as special cases against other request types such as fetcher/produce/offset, and also hacked SocketServer to be aware of these coordinator-related special case requests; after 0.8 we should go back to the principal that SocketServer is ignorant of Kafka. More detailed discussions can be found here.
  • Batch Updating Offsets in Zookeeper: With newer version of ZkClient that supports group writes, the committer would be able to update the offsets of all groups in a single write instead of multiple writes.
  • Virtual Failure in Unit Test: in order to test the failure detection functionality of the coordinator/consumer, we also need to simulate scenarios when the coordinator/consumer is virtually failed (e.g. undergoing a long GC or blocked by IO). Currently it is hacked in the Coordinator code to simulate such cases, but we definitely needs some methods to not touch the implementation but enable them in some testing frameworks.Migrating JMX Metrics from Consumer: with the simplified consumer, some JMX metrics such as getOffsetLag, getConsumedOffset and getLatestOffset which are dependent on ZkClient needs to either be modified or migrated from the consumer side to the coordinator side.
  • Deleting Group Registries from Coordinator: currently the group registry entry will never be removed from the coordinator even if the group has no member consumers any more. With ConsoleConsumer implementation which creates a random-named group to register, it may cause a lot of "empty" group registry entries in some long running coordinator. We need to think about the semantics of "when" and "how" we can delete a consumer group in the coordinator.

...