Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateDraft

Discussion thread: TBD

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Consumer semantics is very useful for distributed processing of the data, however the granularity of parallelism doesn’t satisfy the scaling need when number of topic partitions < number of consumers. Nowadays Kafka client users would do the capacity planning beforehand to allow 5X ~ 10X future traffic increase. This aims to avoid hitting the future scalability issue at the best effort, but still possible that eventually the traffic goes beyond the original planning, and user has to face the unfortunate online migration. One solution we have considered is to do online partition expanding. The proposal was not continuing to evolve due to its complexity. A second option which also painful is to switch input topic on the fly. As of today, the switch process is manual and cumbersome. 

...

  1. Data consume and produce scales are no longer coupled. This means we could save money by configuring a reasonable input topic with decent amount of partitions.
  2. Better avoid partition level hotkeys. When a specific key is processing really slow, the decoupled key based consumption could bypass it and make progress on other keys.
  3. No operation overhead for scaling out. Users just need to add more consumer/stream capacity to unblock even there are fewer consumers.

Proposed Changes

We want to clarify beforehand that this KIP would be a starting point of a transformational change on the Kafka client consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things need to be done, and better illustrate the long term plan while getting some concrete tasks in starting steps.

Use Case Scenario

As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is single input partition with two consumers, one must be idle. If the single box consumer could not keep up the speed of processing, there is no solution to add more computing capacity. It would be ideal we could co-process data within one partition with two consumers when the partition level order is not required, and we could add consumer instances as many as we want.

...

Future more for pt 2, there are also different requirements for stateless and stateful operations, such as whether key level ordering is required or not. Naturally speaking, with a requirement of key level ordering, the broker needs to allocate the same message key to the same consumer within one generation as a must. For stateless operation which doesn't care about the ordering at all, the design could be much simplified as round robin assignment.

Proposed Roadmap

We would start from supporting a new offset commit semantics as this unblocks the potential to process regardless of partition level ordering. The stateful operation support is of more value in nature, so concurrently we could add supports on key based filtering with Fetch calls. The stateless and transactional supports have to be built on top of the first two steps.

Stage nameGoalDependency
Individual commitCreate a generic offset commit model beyond current partition → offset mapping.No
Key filtering based fetch on topic levelAdd capability to FetchRequest with specific hashed key range or specific keysNo
Rebalance support for concurrent assignment Add assignor support to allow splitting single topic partition with different hash rangeKey based filtering
Transactional supportIncorporate individual commit into the transaction processing modelKey based filtering
Support cooperative fetch on broker levelRound robin assign data to cooperative consumer fetches when there is no key-range specified Rebalance support

High Level Design

The design will be broken down, aligning with the roadmap defined above.

Individual Commit

Note that we will use term IC to refer to the individual commit for followup discussion.

...

The offset commit and offset fetch workflow will be slightly changed under individual commit mode.

Offset Commit

The committed individual offset will be appended into IC offset log as normal, however the broker shall maintain an in-memory state to keep track of the individual commits. To eventually compact this topic, each IC record will go through the coordinator memory to make double check whether there could be records deleted. In the illustration scenario,

  • If there is an IC [48-49] then we no longer need to maintain IC ranges [45-47] and [50-50]. We shall append 2 null records to delete those and augment the original IC into [45-50]
  • If there is an IC [43-44], we are good to move stable offset forward. The operation is like:
    • Push stable offset to 47 in the primary offset topic
    • Append a record to remove IC [45-47] in the IC offset topic

Offset Fetch

During a consumer startup in sequential commit mode, it will attempt to fetch the stored offsets on start up before resuming work. This would be the same step for IC mode, only the consumer will be aware of both the individual offsets and stable offsets now. The individual offsets serve as a reference when user calls #poll(), such that if there are any record that's already committed, it will be filtered and the polled records will only contain committed ones. When doing the offset commit, consumer will do a "gap amending" too.

...

In order to cleanup unnecessary IC offsets, the OffsetCommitResponse shall also contain the information about the latest stable offset, so that consumers could safely delete any IC offsets that fall behind stable offset.

Key Based Filtering

In order to allow sharing between the consumers, broker has to be able to distinguish data records by keys. For a generic purpose, the easy way to assign records by keys is to get a hashing range and allow range split when having multiple consumers talking to the same partition. This would be our starting point to support data sharing between consumers. Suppose we do a key hashing to a space of [0, Long.MAX] while two consumers are assigned to the same partition, they will each get key range [0, Long.MAX/2 - 1] and [Long.MAX/2, Long.MAX]. This hash range assignment decision will be made during rebalance phase.

...

With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. We shall discuss the consumer API changes to support this feature in the public interface section.

Rebalance support for concurrent assignment 

To determine the hash range before each consumer generation stabilizes, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of  they would be sharing part of partitions for consumption in a round robin fashion. For example, if 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:

...

The assigned ranges could then be used by the consumer to make their fetch more specific. This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.

Transactional Support

Eventually IC semantic has to be compatible with transaction support. Despite the ongoing discussion of any change to the transaction semantics, we are providing a rough plan on integrating with current transaction model in this KIP as well. To avoid over-complicating the semantic, we will introduce a new delayed queue or purgatory for transactional IC semantic, where the commit shall be blocked until it could hit the stable offset. Take Kafka Streams as an example, if we have a chunk of data [40-49] with stable offset at 39, with two stream threads A and B are turning on EOS at the same time in the sharing mode:

...

This high level workflow is blocking in nature, but easier to reason about its correctness. The design is still open to discussion and may change as the Kafka transaction model evolves.

Cooperative Fetch

Take a look back at the stateless operations like filter or map, there is no necessity to honor the consumer → key mappings during the processing. From KIP-283, we already know it's very costly and inefficient to copy data around. Based on client's need, broker could do a random assignment when receiving fetch requests without key level granularity. It will keep an advancing marker of who has been fetching to which position. This means we don't need to load any data into the main memory and the total of IC offsets will be significantly dropped.

...

The design is still open to discussion.

Public Interfaces

The offset commit protocol will be changed to allow IC semantic. The committed offset will include a list of offset ranges:

...

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}

New Configurations

Broker configs

fetched.data.max.wait.ms

The maximum amount of time in ms that the fetch manager will wait

Consumer configs

allow.individual.commit

determine whether this consumer will participate in a shared consumption mode with other consumers.

Default value: false.

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

...

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

  1. Individual Commit. Although it's not of real use with only individual commit support, Kafka users or developers could start evaluating its performance with some customized applications.
    1. broker has to be upgraded first, until then all the commit request with hash range configured shall fail.
    2. Upgrade the client version.
  2. Key based filtering. Note that if individual commit is already launched when we finished the setup for key based filtering, users could use assign mode to co-processing same partition already.
    1. upgrade broker first
    2. Upgrade the client 
  3. Rebalance support. The config `allow.individual.commit` controls whether a consumer will choose to participate in a key-sharing assignment. If not turning on, the leader consumer shall not consider making it share with others.

As transactional support and cooperative fetch need more design discussion, we will delay putting on the upgrade path now.



Test Plan

Each module has to be well tested by unit test, integration test and system test. For system test, we need to setup different scenarios and verify the eventual progress shall be made.


Rejected Alternatives

There are a couple of alternatives to this proposal here.

...