Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The current version of the high level consumer suffers from herd and split brain problems, where multiple consumers in a group run a distributed algorithm to agree on the same partition ownership decision. Due to different view of the zookeeper data, they run into conflicts that makes the rebalancing attempt fail. But there is no way for a consumer to verify if a rebalancing operation completed successfully on the entire group. This also leads to some potential bugs in the rebalancing logic, for example, https://issues.apache.org/jira/browse/KAFKA-242
  2. This can be mediated by moving the failure detection and rebalancing logic to a centralized highly-available co-ordinator - https://cwiki.apache.org/confluence/display/KAFKA/Central+Consumer+Coordination

...

  1. There are a number of stateful data systems would like to manually assign partitions to consumers. The main motive is to enable them to keep some local per-partition state since the mapping from their consumer to partition never changes; also there are some use cases where it makes sense to co-locate brokers and consumer processes, hence would be nice to optimize the automatic partition assignment algorithm to consider co-location. Examples of such systems are databases, search indexers etc
  2. A side effect of this requirement is wanting to turn off automatic rebalancing in the high level consumer.
  3. This feature depends on the central co-ordination feature since it is cannot be correctly and easily implemented with the current distributed co-ordination model.

...

  • Currently Kafka have two types of consumers: "standard" high-level consumer and simple consumer. In simple consumer user can specify broker-partition and offset, but there is no groupfailover/re-balance support. So users with requirements 3 and 4 but no requirement for group/re-balance would more prefer to use the simple consumer. Basically the high-level consumer provides the following main functionalities against simple consumer:
    • Auto/Hidden Offset Management
    • Auto(Simple) Partition Assignment
    • Broker Failover => Auto Rebalance
    • Consumer Failover => Auto Rebalance
    • If user do not want any of these, then simple consumer is sufficient
    • If user want to control over offset management with others unchanged, one option is to expose the current ZK implementation of the high-level consumer to users and allow them to override; another option is to change the high-level consumer API to return the offset vector associated with messages
    • If user want to control partition assignment, one option is to change the high-level consumer
    .
    • API to allow such config info be passed in while creating the stream; another option is ad-hoc: just make a single-partition topic and assign it to the consumer.
    • If user just want the automatic partition assignment be more "smart" with co-location consideration, etc, one option is to store the host/rack info in ZK and let the rebalance algorithm read them while doing the computation.

* Bottom line: complicating the simple consumer will risk its compatibility with the current applications, hence might not be a good option compared with patching the high-level consumer

  • User specified callback upon rebalancing Requirement 5 is good to have, while users are responsible for decreasing the re-balancing performance with a heavy callback function.Requirement 6 would need 1) user specified default value when there is no data to pull, 2) probably different chunk size for the "default" chunk. Both of these would be tricky parameters to tune (if not impossible)
  • For non-blocking primitive support, again it depends on what user "really" wants:
    • Currently one stream only contains messages from one or more partitions of the same topic, and has the option to return an exception when the next() call has timed out. If what user want is simply want a stream from multiple topics, one option is to implement a special fetcher that put messages from different topics in a shared queue for the end iterator.
    • If what user want is to support some stream join operations, which are usually windowed (i.e. only two messages arrive within a relatively small window size can be correlated), then the current timeout mechanism of KafkaMessageStream is nearly sufficient.