Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We would start from supporting a new offset commit semantics as this unblocks the potential to process data regardless of partition level ordering. The stateful operation support is of more value in nature, so concurrently we could add supports on key based filtering with fetch calls. The optimized stateless and transactional supports have to be built on top of the first two steps.

Stage nameGoalDependency
Individual commitCreate a generic offset commit model beyond current partition → single offset mapping.No
Key filtering based fetchAdd capability to FetchRequest with specific hashed key range or specific keysNo
Rebalance support for concurrent assignment Add assignor support to allow splitting single topic partition with different hash rangeKey based filtering
Transactional supportIncorporate individual commit into the transaction processing modelKey based filtering
Support cooperative fetch on brokerRound robin assign data to cooperative consumer fetches when there is no key-range specified Rebalance support

...

In the individual commit mode, the offset metadata shall grow much quicker and harder to predict. The log needs to be highly compacted to avoid disk waste, which is different requirement from the existing consumer offset topic. Thus, we propose to add another internal topic called `__individual_commit_offsets` which stores the individual commits specifically, and call the current __consumed_offsets topic the primary offset topic. Furthermore, this isolation should make the IC feature rollout more controllable by avoiding messing up stable offset in primary and achieve at-least-once when we need to delete the corrupted IC offset topic. The individual commit offset topic shall be required to co-locate with the __consumed_offsets topic, which means it has to share the configuration of number of partitions, replication factor and min replicas as primary offset topic.

...

The IC offset will be appended into corresponding offset log as normal, however the broker shall maintain an in-memory state to keep track of the individual commits that are written but not yet cleaned up. To eventually compact this topic, each IC record will go through the coordinator memory to check whether there could be records deleted. In the illustration scenario,

...

There is also a math we need to do which is how frequent we should commit. The default max request size for Kafka is 1MB, which means we could not include more than 1MB / 16b ≈ 60,000 ranges within a request. This means if we are in auto commit mode, the consumer has to breakdown commits if it already accumulates a large number of ranges. It's also undesirable to let stable offset make no progress for a long time. For the sake of smooth progress, we define a new config on the max ranges being collected before triggering a commit call.

Also we need to take care of the memory bound for maintaining the individual offsets. If there are too many of them, we eventually couldn't store all in the heap. One solution is to block progress of the active group when the number of IC offsets are too large, which usually indicates a dead member within the group. The coordinator will callout rebalance in this extreme case to make sure progress could be made again.

Offset Fetch

During a consumer startup in sequential commit mode, it will attempt to fetch the stored offsets before resuming work. This would be the same step for IC mode, only the consumer will be aware of both the individual offsets and stable offsets now. The individual offsets serve as a reference when user calls #poll(), such that if there are is any record that's already committed, it will be filtered and the polled records will only contain committed ones. When doing the offset commit, consumer will do a "gap amending" too.

...

Code Block
languagejava
titleErrors.java
INDIVIDUAL_COMMIT_NOT_ALLOWED(88, "The broker hosting coordinator doesn't allow individual commit", IndividualCommitNotAllowedException::new);
TOPIC_RANGE_FETCH_NOT_ALLOWED(89, "The topic is not allowed to perform range fetch with", TopicRangeFetchNotAllowedException::new);
CONSUMER_RANGE_FETCH_NOT_ACCEPTED(90, "The consumer is not allowed to do the range fetch", ConsumerRangeFetchNotAccepted::new);
INDIVIDUAL_COMMIT_TOO_OLD(91, "The individual commit failed due to attempting to commit some entry behind LSO");
MAX_INDIVIDUAL_COMMIT_REACHED(92, "The maximum number of individual commits coordinator could hold reaches the cap");

New Configurations

Broker configs

...