Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Consumer semantics is very useful for distributed data processing of the datain Kafka, however the granularity of parallelism doesn’t sometimes satisfy the scaling need when number of topic partitions < number of consumers. Nowadays Kafka client users would business need. To avoid peak traffic affecting production, Kafka users would normally do the capacity planning beforehand to allow 5X ~ 10X future traffic increase. This aims to avoid hitting the future scalability issue bottleneck at the best effort, but still by chance that eventually the traffic goes beyond the original planning (which may be a good thing (smile)), thus facing the unfortunate online migrationOne solution we people have considered is to do online partition expanding. The proposal was not continuing to evolve due to its complexity and operation overhead. A second painful option which also painful is to switch input topic on the fly. To do that, job owner needs to feed a new topic with more number of partitions, setup a mirrored job and make sure the numbers metrics are aligned for the A/B jobs and then , eventually make the switch. As of today, this whole process is manual, cumbersome and error-prone for most companies that have no downtime SLAcritical SLAs.

In the infra cost perspective, pre-define an impractical high number of partitions will definitely increase the network traffic as more metadata and replication will be needed. Besides extra money paid, the operation overhead increases while maintaining broker cluster in good shape with more topic partitions beyond necessity. It's been a known pain point for Kafka streaming processing scalability which is of great value to be resolved, as the number of processing tasks match the number of input partitions. Therefore, Kafka Streams jobs' overhead increases with the number of partition increase at the same time.

Today most consumer based processing model honors the partition level ordering. However, most operations ETL operations such as join, aggregation and so on are per-key level, so the relative order across different keys is not necessary, except for user customized operations. Many organizations are paying more than what they actually need.

...

  1. Data consume and produce scales are no longer coupled. This means we could save money by configuring a reasonable input topic with decent amount of partitions just for input traffic purpose.
  2. Better avoid partition level hotkeys. When a specific key is processing really slow, the decoupled key based consumption could bypass it and make progress on other keys.
  3. No operation overhead for scaling out in extreme cases. Users just need to add more consumer/stream capacity to unblock even there are a few partitions available.

...

We want to clarify beforehand that this KIP would be a starting point of a transformational change on the consumer consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things need to be done, and better illustrate the long term plan while getting some concrete tasks in starting steps. There will also be follow-up KIPs and design docs, so stay tuned.

Use Case

...

As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is single input partition with two consumers, one consumer must remain idle. If the single box consumer could not keep up the speed of processing, there is no solution to add more computing capacityit but lagging. It would be ideal we could co-process data within one partition with by two consumers when the partition level order is not required, such that we could add as many consumer instances as we want.

...

  1. The bottleneck is on the application processing, not data consumption throughput which could be caused by other reasons such as network saturation of broker.
  2. The processing semantic does not require partition level order, otherwise only one consumer could work on the input sequentially without parallelism.

For pt 2, there are also different requirements for stateless and stateful operations, such as whether key level ordering is required or not. Naturally speaking, with a requirement of key level ordering, the broker needs to allocate the same message key to the same consumer within one generation as a must. For stateless operation which doesn't care about the ordering at all, the design could be much simplified as broker could just do a  round robin assignment of records to fetch requests.

Proposed Roadmap

We would start from supporting a new offset commit semantics as this unblocks the potential to process regardless of partition level ordering. The stateful operation support is of more value in nature, so concurrently we could add supports on key based filtering with Fetch fetch calls. The stateless and transactional supports have to be built on top of the first two steps.

Stage nameGoalDependency
Individual commitCreate a generic offset commit model beyond current partition → offset mapping.No
Key filtering based fetch on topic levelAdd capability to FetchRequest with specific hashed key range or specific keysNo
Rebalance support for concurrent assignment Add assignor support to allow splitting single topic partition with different hash rangeKey based filtering
Transactional supportIncorporate individual commit into the transaction processing modelKey based filtering
Support cooperative fetch on broker levelRound robin assign data to cooperative consumer fetches when there is no key-range specified Rebalance support

...

The offset commit and offset fetch workflow will be slightly changed under individual commit mode.

Offset Commit

The committed individual IC offset will be appended into IC corresponding offset log as normal, however the broker shall maintain an in-memory state to keep track of the individual commits. To eventually compact this topic, each IC record will go through the coordinator memory to check whether there could be records deleted. In the illustration scenario,

  • If there is an IC [48-49] then we no longer need to maintain IC ranges [45-47] and [50-50]. We shall append 2 null records to delete those delete [45-47] and [50-50] and augment the original IC into [45-50]
  • If there is an IC [43-44], we are good to move stable offset forward. The operation is like:
    • Push stable offset to 47 in the primary offset topic
    • Append a record to remove IC [45-47] in the IC offset topic

There is also a math we need to do which is how frequent we should commit. The default max request size for Kafka is 1MB, which means we could not include more than 1MB / 16b ≈ 60,000 ranges within a request. This means if we are in auto commit mode, the consumer has to breakdown commits if it already accumulates a large number of ranges. It's also undesirable to let stable offset make no progress for a long time. Thus we will For the sake of smooth progress, we define a new config on the max ranges being collected to trigger before triggering a commit call.

Offset Fetch

...

Code Block
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode
  ThrottleTimeMs => int32
  Topics         => List<OffsetFetchResponseTopic>
  ErrorCode      => int16

OffsetFetchResponseTopic => Name Partitions
  Name           => int32
  Partitions     => List<OffsetFetchResponsePartition>
  
OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode
  PartitionIndex => int32
  CommittedOffset => int64
  CommittedLeaderEpoch => int32
  Metadata => string
  ErrorCode => int16
  CommittedRangeOffsets => List<Tuple<int64, int64>> // NEW

For better visibility, we shall include the range offsets in the ListConsumerGroupOffsetsResult:

Code Block
languagejava
titleListConsumerGroupOffsetsResult.java
public class ListConsumerGroupOffsetsResult {
	...
    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
	...
}

public class OffsetAndMetadata {

    public long offset();
 
	public List<Tuple<Long, Long>> rangeOffsets(); // NEW

    public String metadata();

    public Optional<Integer> leaderEpoch();
}

To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:

...