Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

Table of Contents

Motivation

We have collected the following requirements from our users which have driven this consumer client re-design project:

  1. Central Coordination - We need to eliminate the complexity in the current consumer's rebalancing operation that leads to herd effect and split brain effect.
  2. Thin Consumer Client - We need a light consumer client code base with minimum external dependencies (e.g. Zookeeper client) to easily re-implement that logic in different languages.

've received quite a lot of feedback on the consumer side features over the past few months. Some of them are improvements to the current consumer design and some are simply new feature/API requests. I have attempted to write up the requirements that I've heard on this wiki - Kafka 0.9 Consumer Rewrite Design
This would involve some significant changes to the consumer APIs, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all.

Thin consumer client:

  1. We have a lot of users who have expressed interest in using and writing non-java clients. Currently, this is pretty straightfoward for the SimpleConsumer but not for the high level consumer. The high level consumer does some complex failure detection and rebalancing, which is non-trivial to re-implement correctly.
  2. The goal is to have a very thin consumer client, with minimum dependencies to make this easy for the users.

Central co-ordination :

  1. The current version of the high level consumer suffers from herd and split brain problems, where multiple consumers in a group run a distributed algorithm to agree on the same partition ownership decision. Due to different view of the zookeeper data, they run into conflicts that makes the rebalancing attempt fail. But there is no way for a consumer to verify if a rebalancing operation completed successfully on the entire group. This also leads to some potential bugs in the rebalancing logic, for example, https://issues.apache.org/jira/browse/KAFKA-242
  2. This can be mediated by moving the failure detection and rebalancing logic to a centralized highly-available co-ordinator - Kafka 0.9 Consumer Rewrite Design

* We think the first two requirements are the prerequisite of the rest. So we have proceeded by trying to design a centralized coordinator for consumer rebalancing without Zookeeper, for details please read here.

...

Allow manual partition assignment

  1. There are a number of - Many partitioned stateful data systems would like to manually assign partitions to consumers. This is because they need The main motive is to enable them to keep some local per-partition state . For examplesince the mapping from their consumer to partition never changes; also there are some use cases where it makes sense to co-locate brokers and consumer processes, hence would be nice to optimize the automatic partition assignment algorithm to consider co-location. Examples of such systems are databases, search indexers .
  2. Manual offset management - Some applications want to manage offsets themselves at the application-system level other than using ZK. We need to provide a way to achieve that in the new consumer redesign. This would involve turning off autocommit.offsets option in the consumer.

* One suggestion is to add on more API on the high-level consumer which allows 1) incremental specification of "topic with any partitions" => "specific number of partitions for the topic" => "specific partitions of the topic" => "specific partitions with specific offsets of the topic". At the last level of settings, we may need to automatically turn off auto-committing offsets.
* One problem with this incremental specification is that one consumer client can require to auto-commit offsets for some topics, while not for some others. This will make the communication between consumers and the coordinator a little bit more complicated, but we can always use a signal telling the coordinator "no need to commit offsets".

  1. Rebalance Triggered User Specified Callback - Some applications maintain transient state that they'd like to flush to some data system right when rebalancing operation is triggered. This is because the partition assignment can change after rebalancing.

* This function can be called whenever a consumer receives a stop_fetcher command.

  1. Non Blocking Consumer API - This is to facilitate stream join operations across multiple Kafka consumer streams. The problem with this is today the streams are essentially blocking. So such stream join operations are not possible. We need to look into supporting non blocking consumer APIs

* One way to do this is whenever the stream founds there is no data to return in the stream, use some default values as "nulls" to the iterator; this would require to modify the BlockingQueue implementation of the chunk/stream on the consumer.

  1. RAC Awareness - It might be potentially efficient to let the consumer's be RAC aware for more efficient consumption.
  1. etc
  2. A side effect of this requirement is wanting to turn off automatic rebalancing in the high level consumer.
  3. This feature depends on the central co-ordination feature since it is cannot be correctly and easily implemented with the current distributed co-ordination model.

Allow manual offset management

  1. Some systems require offset management in a custom database, at specific intervals. Overall, the requirement is to have access to the message metadata like topic, partition, offset of the message, and to be able to provide per-partition offsets on consumer startup.
  2. This would require designing new consumer APIs that allow providing offsets on startup and return message metadata with the consumer iterator.
  3. One thing that needs to be thought through is if the consumer client can be allowed to pick manual offset management for some, but not all topics. One option is to allow the consumer to pick one offset management only. This could potentially make the API a bit simpler
  4. This feature depends on the central co-ordination feature since it is cannot be correctly and easily implemented with the current distributed co-ordination model.

Invocation of user specified callback on rebalance

  1. Some applications maintain transient per-partition state in-memory. On rebalance operation, they would need to “flush” the transient state to some persistent storage.
  2. The requirement is to let the user plugin some sort of callback that the high level consumer invokes when a rebalance operation is triggered.
  3. This requirement has some overlap with the manual partition assignment requirement. Probably, if we allow manual partition assignment, such applications might be able to leverage that to flush transient state. But, the issue is that these applications do want automatic rebalancing and might not want to use the manual partition assignment feature.

Non blocking consumer APIs

  1. This requirement is coming from stream processing applications that implement high-level stream processing primitives like filter by, group by, join operations on kafka streams.
  2. To facilitate stream join operations, it is desirable that Kafka provides non-blocking consumer APIs. Today, since the consumer streams are essentially blocking, these sort of stream join operations are not possible.
  3. This requirement seems to involve some significant redesign of the consumer APIs and the consumer stream logic. So it will be good to give this some more thought.

Comments (Guozhang)

...

  • Currently Kafka have two types of consumers: "standard" high-level consumer and simple consumer. In simple consumer user can specify broker-partition and offset, but there is no groupfailover/re-balance support. So users with requirements 3 and 4 but no requirement for group/re-balance would more prefer to use the simple consumer. Basically the high-level consumer provides the following main functionalities against simple consumer:
    • Auto/Hidden Offset Management
    • Auto(Simple) Partition Assignment
    • Broker Failover => Auto Rebalance
    • Consumer Failover => Auto Rebalance
    • If user do not want any of these, then simple consumer is sufficient
    • If user want to control over offset management with others unchanged, one option is to expose the current ZK implementation of the high-level consumer to users and allow them to override; another option is to change the high-level consumer API to return the offset vector associated with messages
    • If user want to control partition assignment, one option is to change the high-level consumer API to allow such config info be passed in while creating the stream; another option is ad-hoc: just make a single-partition topic and assign it to the consumer.
    • If user just want the automatic partition assignment be more "smart" with co-location consideration, etc, one option is to store the host/rack info in ZK and let the rebalance algorithm read them while doing the computation.

* Bottom line: complicating the simple consumer will risk its compatibility with the current applications, hence might not be a good option compared with patching the high-level consumer

  • User specified callback upon rebalancing Requirement 5 is good to have, while users are responsible for decreasing the re-balancing performance with a heavy callback function.Requirement 6 would need 1) user specified default value when there is no data to pull, 2) probably different chunk size for the "default" chunk. Both of these would be tricky parameters to tune (if not impossible)
  • For non-blocking primitive support, again it depends on what user "really" wants:
    • Currently one stream only contains messages from one or more partitions of the same topic, and has the option to return an exception when the next() call has timed out. If what user want is simply want a stream from multiple topics, one option is to implement a special fetcher that put messages from different topics in a shared queue for the end iterator.
    • If what user want is to support some stream join operations, which are usually windowed (i.e. only two messages arrive within a relatively small window size can be correlated), then the current timeout mechanism of KafkaMessageStream is nearly sufficient.