You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Consumer semantics is very useful for distributed processing of the data, however the granularity of parallelism doesn’t satisfy the scaling need when number of topic partitions < number of consumers. Nowadays Kafka client users would do the capacity planning beforehand to allow 5X ~ 10X future traffic increase. This aims to avoid hitting the future scalability issue at the best effort, but still possible that eventually the traffic goes beyond the original planning, and user has to face the unfortunate online migration. One solution we have considered is to do online partition expanding. The proposal was not continuing to evolve due to its complexity. A second option which also painful is to switch input topic on the fly. As of today, the switch process is manual and cumbersome. 

In the infra cost perspective, pre-define a higher number of partitions will definitely increase the network traffic as more metadata and replication will be needed. Besides extra money paid, the operation overhead increases while maintaining broker cluster in good shape with more topic partitions beyond necessity. It's been a known pain point for Kafka streaming processing scalability which is of great value to be resolved. 

Further more take Kafka Streams as an example, the processing model honors the partition level ordering. However, most operations such as join, aggregation and so on are per-key level, so the relative order across different keys is not necessary, except for user customized operations. 

The proposal here, is to decouple the consumption and physical partition count, by making consumers capable of collaborating on the same topic partition. There are a couple of benefits compared with existing model:

  1. Data consume and produce scales are no longer coupled. This means we could save money by configuring a reasonable input topic with decent amount of partitions.
  2. Better avoid partition level hotkeys. When a specific key is processing really slow, the decoupled key based consumption could bypass it and make progress on other keys.
  3. No operation overhead for scaling out. Users just need to add more consumer/stream capacity to unblock even there are fewer consumers.

Proposed Changes

We want to clarify beforehand that this KIP would be a starting point of a transformational change on the Kafka client consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things have to be done, and better illustrate the dependency plan while getting some concrete tasks in the step one.

Use case scenario

As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is single input partition with two consumers, one must be idle. If the single box consumer could not keep up the speed of processing, there is no solution to add more computing capacity. It would be ideal we could co-process data within one partition with two consumers when the partition level order is not required, and we could add consumer instances as many as we want.

So this cooperative consumption model applies with following limitations:

  1. The bottleneck is on the application processing, not data consumption which could be caused by other roots such as network saturation of broker.
  2. The processing semantic does not require partition level order, otherwise only one consumer could work on the input sequentially without parallelism.

Future more for pt 2, there are also different requirements for stateless and stateful operations, such as whether key level ordering is required or not. Naturally speaking, with a requirement of key level ordering, the broker needs to allocate the same message key to the same consumer within one generation as a must. For stateless operation which doesn't care about the ordering at all, the design could be much simplified as round robin assignment.

Proposed roadmap

Stage nameGoalDependency
Individual acknowledgementCreate a generic offset acknowledgement model beyond current partition → offset mapping.No
Support cooperative fetch on broker levelBroker will maintain a state for to track which consumerIndividual ack
Key filtering based fetch on topic levelAdd capability to FetchRequest with specific hashed key range or specific keysNo
Transactional supportIncorporate individual ack into the transaction processing model



Public Interfaces

The 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

There are a couple of alternatives to this proposal here.

  1. KIP-253 proposed physical partition expansion, which is a fairly complex implementation and could be hard to reason about correctness.
  2. Some discussion around making Kafka Streams as a multi-threading model where consumers are completely decoupled from processing thread. This means we have to tackle the concurrent processing challenge and there could be more inherent work to redesign state store semantics too.


  • No labels