Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Consumer semantics is very useful for distributed processing of the data, however the granularity of parallelism doesn’t satisfy the scaling need when number of topic partitions < number of consumers. Nowadays Kafka client users would do the capacity planning beforehand to allow 5X ~ 10X future traffic increase. This aims to avoid hitting the future scalability issue at the best effort, but still possible by chance that eventually the traffic goes beyond the original planning , and user has to face (which may be a good thing (smile)), thus facing the unfortunate online migration. One solution we have considered is to do online partition expanding. The proposal was not continuing to evolve due to its complexity. A second option which also painful is to switch input topic on the fly. To do that, job owner needs to feed a new topic with more number of partitions, setup a mirrored job and make sure the numbers are aligned for the A/B jobs and then make the switch. As of today, the switch this whole process is manual and cumbersome. , cumbersome and error-prone for most companies that have no downtime SLA.

In the infra cost perspective, pre-define a higher an impractical high number of partitions will definitely increase the network traffic as more metadata and replication will be needed. Besides extra money paid, the operation overhead increases while maintaining broker cluster in good shape with more topic partitions beyond necessity. It's been a known pain point for Kafka streaming processing scalability which is of great value to be resolved, as the number of processing tasks match the number of input partitions. Therefore, Kafka Streams jobs' overhead increases with the number of partition increase at the same time.

Today most consumer based Further more take Kafka Streams as an example, the processing model honors the partition level ordering. However, most operations such as join, aggregation and so on are per-key level, so the relative order across different keys is not necessary, except for user customized operations.  Many organizations are paying more than what they actually need.

The proposal hereThe proposal here, is to decouple the consumption threads and physical partition partitions count, by making consumers capable of collaborating on the same individual topic partition. There are a couple of benefits compared with existing model:

  1. Data consume and produce scales are no longer coupled. This means we could save money by configuring a reasonable input topic with decent amount of partitions just for input traffic purpose.
  2. Better avoid partition level hotkeys. When a specific key is processing really slow, the decoupled key based consumption could bypass it and make progress on other keys.
  3. No operation overhead for scaling out. Users just need to add more consumer/stream capacity to unblock even there are fewer consumersa few partitions available.

Proposed Changes

We want to clarify beforehand that this KIP would be a starting point of a transformational change on the Kafka client consumer consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things need to be done, and better illustrate the long term plan while getting some concrete tasks in starting steps. There will also be follow-up KIPs and design docs, so stay tuned.

Use Case Scenario

As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is single input partition with two consumers, one consumer must be remain idle. If the single box consumer could not keep up the speed of processing, there is no solution to add more computing capacity. It would be ideal we could co-process data within one partition with two consumers when the partition level order is not required, and such that we could add as many consumer instances as many as we want.

So this cooperative consumption model applies with following limitations:

  1. The bottleneck is on the application processing, not data consumption which could be caused by other roots reasons such as network saturation of broker.
  2. The processing semantic does not require partition level order, otherwise only one consumer could work on the input sequentially without parallelism.

Future more for For pt 2, there are also different requirements for stateless and stateful operations, such as whether key level ordering is required or not. Naturally speaking, with a requirement of key level ordering, the broker needs to allocate the same message key to the same consumer within one generation as a must. For stateless operation which doesn't care about the ordering at all, the design could be much simplified as round robin assignment.

...

Stage nameGoalDependency
Individual commitCreate a generic offset commit model beyond current partition → offset mapping.No
Key filtering based fetch on topic levelAdd capability to FetchRequest with specific hashed key range or specific keysNo
Rebalance support for concurrent assignment Add assignor support to allow splitting single topic partition with different hash rangeKey based filtering
Transactional supportIncorporate individual commit into the transaction processing modelKey based filtering
Support cooperative fetch on broker levelRound robin assign data to cooperative consumer fetches when there is no key-range specified Rebalance support

High Level Design

The design will be broken down, aligning with the roadmap defined above.

Individual Commit

Individual Commit

*** Note that we will use term IC to refer to the individual commit for followup discussion.

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramDisplayNameIndividual commit
lboxtrue
revision9
diagramNameIndividual ack
simpleViewerfalse
width
linksauto
tbstyletop
diagramWidth573

The logic behind individual commit is very straightforward. For sequential commit each topic partition will only point to one offset number, while under individual commit it is possible to have "gaps" in between. The "stable offset" is a borrowed concept from transaction semantic which is just for illustration purpose, and the idea is that stable offset marks whose purpose is to mark the position where all the messages before it are already committed. The `IC` blocks refer to the individual commits that are marking records that are already processed.

In the individual commit mode, the offset metadata shall grow much quicker and harder to predict. To avoid messing up the stable offset, we propose to add another internal topic called `__individual_commit_offsets` which stores the individual commits specifically, and call the current __consumed_offsets topic the primary offset topic. This isolation should make the development more controllable by avoiding messing up primary and achieve at-least-once in worst case when we need to delete the corrupted IC offset topic. The individual commit offset topic shall be required to co-locate with the __consumed_offsets topic, which means it has to share the configuration of number of partitions, replication factor and min replicas as primary offset topic.

...

The committed individual offset will be appended into IC offset log as normal, however the broker shall maintain an in-memory state to keep track of the individual commits. To eventually compact this topic, each IC record will go through the coordinator memory to make double check whether there could be records deleted. In the illustration scenario,

...

During a consumer startup in sequential commit mode, it will attempt to fetch the stored offsets on start up before resuming work. This would be the same step for IC mode, only the consumer will be aware of both the individual offsets and stable offsets now. The individual offsets serve as a reference when user calls #poll(), such that if there are any record that's already committed, it will be filtered and the polled records will only contain committed ones. When doing the offset commit, consumer will do a "gap amending" too.

Imagine a scenario when a consumer fetches based on below scenariosetup:

Offset: stable: 40, IC: [43-45], [48-49]
Fetched range: [40-50]

...

The offset commit protocol will be changed to allow IC semantic. The committed offset will include a list of offset ranges :indicating the acknowledged messages, and in the response a LastStableOffset will be added to let consumer purge local IC offsets.

Code Block
OffsetCommitRequest => GroupId Offsets GenerationId MemberId GroupInstanceId
  GroupIdOffsets GenerationId MemberId GroupInstanceId
  GroupId             => String
  Offsets             => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32, default -1
  MemberId            => nullable String
  GroupInstanceId     => nullable String

CommittedOffset => offset, metadata, leaderEpoch, offsetRanges
  Offset              => Stringint64, default -1
  OffsetsMetadata             => Map<TopicPartition,nullable CommittedOffset>String
  LeaderEpoch GenerationId        => int32, default -1Optional<int32>
  MemberId   OffsetRanges         => nullableList<Tuple<int64, String
int64>>  GroupInstanceId     // NEW

OffsetCommitResponse => nullableThrottleTimeMs String

CommittedOffset => offset, metadata, leaderEpoch, offsetRanges
  OffsetTopics
  Topics         => List<OffsetCommitResponseTopic>
  ErrorCode      => int64, default -1int16

OffsetCommitResponseTopic => Name Partitions
  Metadata Name           => nullable Stringint32
  LeaderEpochPartitions     => List<OffsetCommitResponsePartition>
  
OffsetCommitResponsePartition => Optional<int32>PartitionIndex ErrorCode LastStableOffset
  OffsetRangesPartitionIndex => int32
  ErrorCode => int16
  LastStableOffset => List<Tuple<int64, int64>>int64 // NEW

When the offset range is not empty, broker will only handle offset ranges and ignore the plain offset field. Optionally broker will check if the original offset field is set to -1 to make sure there is no data corruption in the request. Trivially, transaction offset commit will also include the offset range.

...

accept.individual.commit

Determine whether the group coordinator allows individual commit.

Default: true.

max.bytes.key.range.results

The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check.

Default: 209715200 bytes.

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes.

Topic config


accept.range.fetch

Determine whether the topic is allowed to perform key range based fetch. The reason to set it false could be more like a user's call such that any subscribing application must obey the partition level ordering.

Default: true.

reserve.message.value.range.fetch

Determine whether when caching last fetch request result, we should also keep the message value inside main memory for optimization.

Default: false

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes.

Consumer configs

allow.individual.commit

Determine whether this consumer will participate in a shared consumption mode with other consumers.

Default: false.

...