Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We want to clarify beforehand that this KIP would be a starting point of a transformational change on the Kafka client consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things have to be done, and better illustrate the dependency plan while getting some concrete tasks in the step one.

Use

...

Case Scenario

As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is single input partition with two consumers, one must be idle. If the single box consumer could not keep up the speed of processing, there is no solution to add more computing capacity. It would be ideal we could co-process data within one partition with two consumers when the partition level order is not required, and we could add consumer instances as many as we want.

...

Future more for pt 2, there are also different requirements for stateless and stateful operations, such as whether key level ordering is required or not. Naturally speaking, with a requirement of key level ordering, the broker needs to allocate the same message key to the same consumer within one generation as a must. For stateless operation which doesn't care about the ordering at all, the design could be much simplified as round robin assignment.

Proposed

...

Roadmap

We would start from supporting a new offset commit semantics as this unblocks the potential to process regardless of partition level ordering. The stateful operation support is of more value in nature, so concurrently we could add supports on key based filtering with Fetch calls. The stateless and transactional supports have to be built on top of the first two steps.

Stage nameGoalDependency
Individual commitCreate a generic offset commit model beyond current partition → offset mapping.No
Key filtering based fetch on topic levelAdd capability to FetchRequest with specific hashed key range or specific keysNo
Rebalance support for concurrent assignment Add assignor support to allow splitting single topic partition with different hash rangeKey based filtering
Transactional supportIncorporate individual commit into the transaction processing modelKey based filtering
Support cooperative fetch on broker levelRound robin assign data to cooperative consumer fetches when there is no key-range specified Individual commit

...

The design will be broken down, aligning with the roadmap defined above.

Individual Commit

Note that we will use term IC to refer to the individual commit for followup discussion.

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramDisplayNameIndividual commit
lboxtrue
revision9
diagramNameIndividual ack
simpleViewerfalse
width
linksauto
tbstyletop
diagramWidth573

...

In the individual commit mode, the offset metadata shall grow much quicker and harder to predict. To avoid messing up the stable offset, we propose to add another internal topic called `__individual_commit_offsets` which stores the individual commits specifically. This extra overhead is more controllable by doing this isolation. And each time 

Public Interfaces

...

, and call the current __consumed_offsets topic the primary offset topic. This isolation should make the development more controllable by avoiding messing up primary and achieve at-least-once in worst case when we need to delete the corrupted IC offset topic.

The offset commit and offset fetch workflow will be slightly changed under individual commit mode.

Offset Commit

The committed individual offset will be appended into IC offset log as normal, however the broker shall maintain an in-memory state to keep track of the individual commits. To eventually compact this topic, each IC record will go through the coordinator memory to make double check whether there could be records deleted. In the illustration scenario,

  • If there is an IC [48-49] then we no longer need to maintain IC ranges [45-47] and [50-50]. We shall append 2 null records to delete those and augment the original IC into [45-50]
  • If there is an IC [43-44], we are good to move stable offset forward. The operation is like:
    • Push stable offset to 47 in the primary offset topic
    • Append a record to remove IC [45-47] in the IC offset topic

Offset Fetch

During a consumer startup in sequential commit mode, it will attempt to fetch the stored offsets on start up before resuming work. This would be the same step for IC mode, only the consumer will be aware of both the individual offsets and stable offsets now. The individual offsets serve as a reference when user calls #poll(), such that if there are any record that's already committed, it will be filtered and the polled records will only contain committed ones. When doing the offset commit, consumer will do a "gap amending" too.

Imagine a scenario when a consumer fetches based on below scenario:

Offset: stable: 40, IC: [43-45], [48-49]
Fetched range: [40-50]

Consumer will commit only the processed offsets:

[41-42], [46-47], [50-50]

which matches the offset commit semantic on always committing offsets that are processed within current batch.

Key Based Filtering

In order to allow sharing between the consumers, broker has to be able to distinguish data records by keys.

Public Interfaces

The 

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blocker for us to implement a similar feature are 3-folds:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data. (Intermediate service)
  2. Our broker doesn’t know anything about data distribution, all the metadata is encoded and at least it has to understand the message key. In reality, we could not afford letting consumers fetch with raw keys
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. The burden for broadcasting is high

Compatibility, Deprecation, and Migration Plan

...