Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

                             (record metadata)       -- (confirm) --> Consumer (register succeed, block reading for coordinator requests such as ping, stop-fetcher, start-fetcher)

Coordinator       (record metadata)       -- (confirm) --> Consumer (same as above)

...

                                     -- (ping) -->     Consumer (failed)

----------------------------------------------------------------------------------------------

...

                                     -- (stop-fetcher) -->     Consumer (alive)

----------------------------------------------------------------------------------------------

...

                                                                                       -- (start-fetcher) -->     Consumer (become failed)

...

12. Open Issues / Future work

Implementation
  • Active CommitOffset on Consumers: Currently consumers can no longer actively call commitOffset() to commit their current consumed offsets to Zookeeper, but can only wait for HeartbeatRequests to respond with their offsets. In order to support this the consumers must be not only able to read requests from the coordinator channel but also write requests to the channel. In addition, the SocketServer that the coordinator depends on for RPC communication must also be able to concurrently read to and write from channels.
  • Conflict of HeartbeatRequest with Stop/StartFetcherResponse: When the heartbeat scheduler of the coordinator tries to send the HeartbeatRequest to consumers, it is possible that the channel for the consumer has some partial read Stop/StartFetcherResponse, since the current SocketServer can only read or write alone at the same time, the scheduler can only drop this request and wait for the next time. On the consumer's side, it would probably suspect the coordinator has failed and re-connect to the new one, causing unnecessary rebalancing attempts. The solution of the previous issue should be able to solve this also: enabling SocketServer to be able to concurrently read to and write from channels.
  • Incorporating with 0.8 branch: There are still several implementation details that are based on 0.7, which needs to be incorporated with 0.8 after 0.8 is stable (these are marked as TODOs in the code):
    • Combine Request and Response into RequestOrResponse: this can allow StartFetcherPurgatory and StopFetcherPurgatory to be combined also.
    • Request format protocol: in 0.8 we need to

...

  1. Batch Updating Offsets in Zookeeper: With newer version of ZkClient that supports group writes, the committer would be able to update the offsets of all groups in a single write instead of multiple writes.
Design

...

    • add some more fields such as versionId, correlationId and ClientId to the requests.
    • Refactor RequestHandlers: in 0.8 the RequestHandlers are in kafka.api with SocketServer enhanced with request and response queues. Currently we hacked the RequestHandlers to treat coordinator request/response as special cases against other request types such as fetcher/produce/offset, and also hacked SocketServer to be aware of these coordinator-related special case requests; after 0.8 we should go back to the principal that SocketServer is ignorant of Kafka. More detailed discussions can be found here.
  • Batch Updating Offsets in Zookeeper: With newer version of ZkClient that supports group writes, the committer would be able to update the offsets of all groups in a single write instead of multiple writes.
  • Virtual Failure in Unit Test: in order to test the failure detection functionality of the coordinator/consumer, we also need to simulate scenarios when the coordinator/consumer is virtually failed (e.g. undergoing a long GC or blocked by IO). Currently it is hacked in the Coordinator code to simulate such cases, but we definitely needs some methods to not touch the implementation but enable them in some testing frameworks.
  • Migrating JMX Metrics from Consumer: with the simplified consumer, some JMX metrics such as getOffsetLag, getConsumedOffset and getLatestOffset which are dependent on ZkClient needs to either be modified or migrated from the consumer side to the coordinator side.
  • Deleting Group Registries from Coordinator: currently the group registry entry will never be removed from the coordinator even if the group has no member consumers any more. With ConsoleConsumer implementation which creates a random-named group to register, it may cause a lot of "empty" group registry entries in some long running coordinator. We need to think about the semantics of "when" and "how" we can delete a consumer group in the coordinator.
Design
  • Dependency on Initial Broker List: When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
  • Latency Spike during Coordinator Failover: Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it, causing a lot of channel connectioned and rebalancing requests to the rebalance handlers. Hence the consumers will not start their fetchers after be connected to the coordinator until it receives the StartFetcherRequest, some groups's consumers might wait longer time than usually to start consuming.
  • Multiple Coordinator: In the future, it might be possible that we need each broker to act as a coordinator, with consumer groups evenly distributed amongst all the consumer coordinators. This will enable supporting >5K consumers since right now the number of open socket connections a single machine can handle limits the scale of the consumers.
  • Rebalance Retrial Semantics needs Rethinking: currently the rebalance retrial semantics still follows the original design: whenever the current attempt has failed, sleep for some backoff time and retry again; when the maximum number of retries as reached, throw an exception. With the coordinator based rebalancing mechanism, it is not clear if the maximum rebalance retry still make sense any more: probably the rebalance handler should keep retrying until the rebalance has succeeded, and also the backoff time should be dependent to the consumers' session timeout value.