Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the infra cost perspective, pre-define an impractical high number of partitions will definitely increase the network traffic as more metadata and replication will be needed. Besides extra money paid, the operation overhead increases while maintaining broker cluster in good shape with more topic partitions beyond necessity. It's been a known pain point for Kafka streaming processing scalability which is of great value to be resolved, as the number of processing tasks match the number of input partitions. Therefore, Kafka Streams jobs' overhead increases with the number of partition increase at the same time.

Also in reality for use cases such as Kafka Streams, online partition expansion is impossible for stateful operations, as the number of partition for changelog/repartition topics are fixed. This means we couldn't change the input partitions gracefully once the job is up and running. To be able to scale up, we have to cleanup any internal topics to continue processing, which means extra downtime and data loss. By scaling on the client side directly, the operation becomes much easier.

Today most consumer based processing model honors the partition level ordering. However, ETL operations such as join, aggregation and so on are per-key level, so the relative order across different keys does not require to be maintained, except for user customized operations. Many organizations are paying more system guarantee than what they actually need.

...

We would start from supporting a new offset commit semantics as this unblocks the potential to process data regardless of partition level ordering. The stateful operation support is of more value in nature, so concurrently we could add supports on key based filtering with fetch calls. The optimized stateless and transactional supports have to be built on top of the first two steps.

Stage nameGoalDependency
Individual commitCreate a generic offset commit model beyond current partition → single offset mapping.No
Key filtering based fetchAdd capability to FetchRequest with specific hashed key range or specific keysNo
Rebalance support for concurrent assignment Add assignor support to allow splitting single topic partition with different hash rangeKey based filtering
Transactional supportIncorporate individual commit into the transaction processing modelKey based filtering
Support cooperative fetch on brokerRound robin assign data to cooperative consumer fetches when there is no key-range specified Rebalance support

...

In the individual commit mode, the offset metadata shall grow much quicker and harder to predict. The log needs to be highly compacted to avoid disk waste, which is different requirement from the existing consumer offset topic. Thus, we propose to add another internal topic called `__individual_commit_offsets` which stores the individual commits specifically, and call the current __consumed_offsets topic the primary offset topic. Furthermore, this isolation should make the IC feature rollout more controllable by avoiding messing up stable offset in primary and achieve at-least-once when we need to delete the corrupted IC offset topic. The individual commit offset topic shall be required to co-locate with the __consumed_offsets topic, which means it has to share the configuration of number of partitions, replication factor and min replicas as primary offset topic.

...

The IC offset will be appended into corresponding offset log as normal, however the broker shall maintain an in-memory state to keep track of the individual commitsthe individual commits that are written but not yet cleaned up. To eventually compact this topic, each IC record will go through the coordinator memory to check whether there could be records deleted. In the illustration scenario,

...

There is also a math we need to do which is how frequent we should commit. The default max request size for Kafka is 1MB, which means we could not include more than 1MB / 16b ≈ 60,000 ranges within a request. This means if we are in auto commit mode, the consumer has to breakdown commits if it already accumulates a large number of ranges. It's also undesirable to let stable offset make no progress for a long time. For the sake of smooth progress, we define a new config on the max ranges being collected before triggering a commit callbefore triggering a commit call.

Also we need to take care of the memory bound for maintaining the individual offsets. If there are too many of them, we eventually couldn't store all in the heap. One solution is to block progress of the active group when the number of IC offsets are too large, which usually indicates a dead member within the group. The coordinator will callout rebalance in this extreme case to make sure progress could be made again.

Offset Fetch

During a consumer startup in sequential commit mode, it will attempt to fetch the stored offsets before resuming work. This would be the same step for IC mode, only the consumer will be aware of both the individual offsets and stable offsets now. The individual offsets serve as a reference when user calls #poll(), such that if there are is any record that's already committed, it will be filtered and the polled records will only contain committed ones. When doing the offset commit, consumer will do a "gap amending" too.

...

Code Block
languagejava
titleErrors.java
INDIVIDUAL_COMMIT_NOT_ALLOWED(88, "The broker hosting coordinator doesn't allow individual commit", IndividualCommitNotAllowedException::new);
TOPIC_RANGE_FETCH_NOT_ALLOWED(89, "The topic is not allowed to perform range fetch with", TopicRangeFetchNotAllowedException::new);
CONSUMER_RANGE_FETCH_NOT_ACCEPTED(90, "The consumer is not allowed to do the range fetch", ConsumerRangeFetchNotAccepted::new);
INDIVIDUAL_COMMIT_TOO_OLD(91, "The individual commit failed due to attempting to commit some entry behind LSO");
MAX_INDIVIDUAL_COMMIT_REACHED(92, "The maximum number of individual commits coordinator could hold reaches the cap");

New Configurations

Broker configs

accept.individual.commit

Determine whether the group coordinator allows individual commit.

Default: true

max.bytes.key.range.resultscache

The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check.

Default: 209715200 bytes

max.bytes.fetcher.filtering

The maximum memory allowed for each replica fetcher to do general broker side filtering, including the key hash processing.

Default: 104857600 bytes

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes

...