Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We want to clarify beforehand that this KIP would be a starting point of a transformational change on the Kafka client consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things have need to be done, and better illustrate the dependency long term plan while getting some concrete tasks in the step onestarting steps.

Use Case Scenario

As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is single input partition with two consumers, one must be idle. If the single box consumer could not keep up the speed of processing, there is no solution to add more computing capacity. It would be ideal we could co-process data within one partition with two consumers when the partition level order is not required, and we could add consumer instances as many as we want.

...

which matches the offset commit semantic on always committing offsets that are processed within current batch. 

In order to cleanup unnecessary IC offsets, the OffsetCommitResponse shall also contain the information about the latest stable offset, so that consumers could safely delete any IC offsets that fall behind stable offset.

Key Based Filtering

In order to allow sharing between the consumers, broker has to be able to distinguish data records by keys. For a generic purpose, the easy way to assign records by keys is to get a hashing range and allow range split when having multiple consumers talking to the same partition. This would be our starting point to support 

Public Interfaces

...

to support data sharing between consumers. Suppose we do a key hashing to a space of [0, Long.MAX] while two consumers are assigned to the same partition, they will each get key range [0, Long.MAX/2 - 1] and [Long.MAX/2, Long.MAX]. This hash range assignment decision will be made during rebalance phase.

With the key based hashing, brokers will load topic partitions data into the main memory and do the partitioning work based on the key hashed range. When a FetchRequest comes with specific key range, broker will only reply the ranges that meets the need. This workflow could be implemented as a generic server side filtering, and KIP-283 is already a good example where we also attempt to load data into memory for down-conversion.

There is obvious performance penalty compared with zero-copy mechanism for existing consumer fetching, however the value of unblocking new use cases motivates us to figure out more optimizations in the long run.

With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. We shall discuss the consumer API changes to support this feature in the public interface section.

Rebalance support for concurrent assignment 

To determine the hash range before each consumer generation stabilizes, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of  they would be sharing part of partitions for consumption in a round robin fashion. For example, if 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:

N = Long.MAX
M1: tp1[0, N/2 - 1],
M2: tp2[0, N/2 - 1],
M3: tp3,
M4: tp1[N/2, N]
M5: tp2[N/2, N]

The new assignment comes from the fact that partitions are playing a reverse mapping to consumers. So in partition perspective, our assignment looks like a round robin assignment based off partitions:

tp1: M1, M4
tp2: M2, M5
tp3: M3

The assigned ranges could then be used by the consumer to make their fetch more specific. This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.

Transactional Support

Eventually IC semantic has to be compatible with transaction support. Despite the ongoing discussion of any change to the transaction semantics, we are providing a rough plan on integrating with current transaction model in this KIP as well. To avoid over-complicating the semantic, we will introduce a new delayed queue or purgatory for transactional IC semantic, where the commit shall be blocked until it could hit the stable offset. Take Kafka Streams as an example, if we have a chunk of data [40-49] with stable offset at 39, with two stream threads A and B are turning on EOS at the same time in the sharing mode:

  1. Stream thread A did a fetch to get [40, 44]
  2. Stream thread B did a fetch to get [45, 49]
  3. Stream thread B finished processing and issue a transactional commit of [45, 49]
  4. As the offset range [40, 44] is held by someone at the moment, transactional commit request will be put in a purgatory to wait for stable offset advance
  5. Stream thread A finished processing and issue a transactional commit of [40, 44]
  6. We first advance the stable offset and reply stream thread A with successful commit
  7. Then we search purgatory to reply stream thread B and allow it to proceed

This high level workflow is blocking in nature, but easier to reason about its correctness. The design is still open to discussion and may change as the Kafka transaction model evolves.

Cooperative Fetch

Take a look back at the stateless operations like filter or map, there is no necessity to honor the consumer → key mappings during the processing. From KIP-283, we already know it's very costly and inefficient to copy data around. Based on client's need, broker could do a random assignment when receiving fetch requests without key level granularity. It will keep an advancing marker of who has been fetching to which position. This means we don't need to load any data into the main memory and the total of IC offsets will be significantly dropped.

For example if we have a chunk of data [40-49], where offsets 41, 43, 45, 47, 49 belong to key A and 40, 42, 44, 46, 48 belong to key B, consumers owning key A will commit 5 IC offsets, same as key B owner. If we don't maintain that mapping, for consumer fetching we will first return data range [40-44] and advance the in-memory marker, and then reply [45, 49] even potentially this consumer is trying to fetch starting from offset 40, with the hope that someone else will take care of [40-44] eventually.

Cooperative fetch is more like an optimization upon our stateless use case scenario, instead of a new feature. To make it right, we have to encode more metadata within the consumer fetch request such as consumer generation and consumer id, wisely refusing advancing the marker unless necessary when 1. the consumer is zombie, 2. the same consumer is retrying the fetch 3. session timeout. And there should also be a max wait time for commit gaps so that we don't lose the chance to process the data when consumer whoever did the fetch encounters hard failures already. It could be set the same as consumer configured session timeout.

Public Interfaces

The offset commit protocol will be changed to allow IC semantic. The committed offset will include a list of offset ranges:

Code Block
OffsetCommitRequest => GroupId Offsets GenerationId MemberId GroupInstanceId
  GroupId             => String
  Offsets             => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32, default -1
  MemberId            => nullable String
  GroupInstanceId     => nullable String

CommittedOffset => offset, metadata, leaderEpoch, offsetRanges
  Offset              => int64, default -1
  Metadata            => nullable String
  LeaderEpoch         => Optional<int32>
  OffsetRanges        => List<Tuple<int64, int64>> // NEW

When the offset range is not empty, broker will only handle offset ranges and ignore the plain offset field. Optionally broker will check if the original offset field is set to -1 to make sure there is no data corruption in the request. 

The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility.

Code Block
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges
  MaxWaitTime => int32
  ReplicaId => int32
  MinBytes => int32
  IsolationLevel => int8
  FetchSessionId => int32
  FetchSessionEpoch => int32
  Topics => TopicName Partitions
    TopicName => String
    Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
        Partition => int32
        CurrentLeaderEpoch => int32
        FetchOffset => int64
        StartOffset => int64
        MaxBytes => int32
  RemovedTopics => RemovedTopicName [RemovedPartition]
    RemovedTopicName => String
    RemovedPartition => int32
  KeyRanges => List<Tuple<int64, int64>> // NEW

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

...

Compatibility, Deprecation, and Migration Plan

...

The upgrade path for different stages may be slightly different, but as a rule of thumb brokers need to be upgraded to the latest version first, otherwise all the interacting components would be failing.

Rejected Alternatives

There are a couple of alternatives to this proposal here.

...