Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the infra cost perspective, pre-define an impractical high number of partitions will definitely increase the network traffic as more metadata and replication will be needed. Besides extra money paid, the operation overhead increases while maintaining broker cluster in good shape with more topic partitions beyond necessity. It's been a known pain point for Kafka streaming processing scalability which is of great value to be resolved, as the number of processing tasks match the number of input partitions. Therefore, Kafka Streams jobs' overhead increases with the number of partition increase at the same time.

Today most consumer based processing model honors the partition level ordering. However, ETL operations such as join, aggregation and so on are per-key level, so the relative order across different keys does not require to be maintained, except for user customized operations. Many organizations are paying more system guarantee than what they actually need.

The proposal here, is to decouple the consumption threads and physical partitions count, by making consumers capable of collaborating on individual topic partition. There are a couple of benefits compared with existing model:

  1. Data consume and produce scales are no longer coupled. This means we could save money by configuring a reasonable topic with decent amount of partitions to save cost.
  2. Better avoid partition level hotkeys. When a specific key is processing really slow, the decoupled key based consumption could bypass it and make progress on other keys.
  3. No operation overhead for scaling out in extreme cases. Users just need to add more consumer/stream capacity to unblock even there are a few partitions available.

Proposed Changes

We want to clarify beforehand that this KIP would be a starting point of a transformational change on the consumer consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things need to be done, and illustrate the long term plan while getting some concrete tasks in starting steps. There will also be follow-up KIPs and design docs, so stay tuned.

Use Case

As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is one single input partition with two consumers, one consumer must remain idle. If the single box consumer could not keep up the speed of processing, there is no other solution but lagging. It would be ideal we could co-process data within one partition by two consumers when the partition level order is not required, such that we could add as many consumer instances as we want.

So this cooperative consumption model applies with following limitations:

  1. The bottleneck is on the application processing, not data consumption throughput which could be caused by other reasons such as network saturation of broker.
  2. The processing semantic does not require partition level order, otherwise only one consumer could work on the input sequentially without parallelism.

For pt 2, there are also different requirements for stateless and stateful operations, such as whether key level ordering is required or not. Naturally speaking, with a requirement of key level ordering, the broker needs to allocate the same message key to the same consumer within one generation as a must. For stateless operation which doesn't care about the ordering at all, broker could just do a  round robin assignment of records to fetch requests.

Proposed Roadmap

We would start from supporting a new offset commit semantics as this unblocks the potential to process regardless of partition level ordering. The stateful operation support is of more value in nature, so concurrently we could add supports on key based filtering with fetch calls. The stateless and transactional supports have to be built on top of the first two steps.

Also in reality for use cases such as Kafka Streams, online partition expansion is impossible for stateful operations, as the number of partition for changelog/repartition topics are fixed. This means we couldn't change the input partitions gracefully once the job is up and running. To be able to scale up, we have to cleanup any internal topics to continue processing, which means extra downtime and data loss. By scaling on the client side directly, the operation becomes much easier.

Today most consumer based processing model honors the partition level ordering. However, ETL operations such as join, aggregation and so on are per-key level, so the relative order across different keys does not require to be maintained, except for user customized operations. Many organizations are paying more system guarantee than what they actually need.

The proposal here, is to decouple the consumption threads and physical partitions count, by making consumers capable of collaborating on individual topic partition. There are a couple of benefits compared with existing model:

  1. Data consume and produce scales are no longer coupled. This means we could save money by configuring a reasonable topic with decent amount of partitions to save cost.
  2. Better avoid partition level hotkeys. When a specific key is processing really slow, the decoupled key based consumption could bypass it and make progress on other keys.
  3. No operation overhead for scaling out in extreme cases. Users just need to add more consumer/stream capacity to unblock even there are a few partitions available.

Proposed Changes

We want to clarify beforehand that this KIP would be a starting point of a transformational change on the consumer consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things need to be done, and illustrate the long term plan while getting some concrete tasks in starting steps. There will also be follow-up KIPs and design docs, so stay tuned.

Use Case

As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is one single input partition with two consumers, one consumer must remain idle. If the single box consumer could not keep up the speed of processing, there is no other solution but lagging. It would be ideal we could co-process data within one partition by two consumers when the partition level order is not required, such that we could add as many consumer instances as we want.

So this cooperative consumption model applies with following limitations:

  1. The bottleneck is on the application processing, not data consumption throughput which could be caused by other reasons such as network saturation of broker.
  2. The processing semantic does not require partition level order, otherwise only one consumer could work on the input sequentially without parallelism.

For pt 2, there are also different requirements for stateless and stateful operations, such as whether key level ordering is required or not. Naturally speaking, with a requirement of key level ordering, the broker needs to allocate the same message key to the same consumer within one generation as a must. For stateless operation which doesn't care about the ordering at all, broker could just do a  round robin assignment of records to fetch requests.

Proposed Roadmap

We would start from supporting a new offset commit semantics as this unblocks the potential to process data regardless of partition level ordering. The stateful operation support is of more value in nature, so concurrently we could add supports on key based filtering with fetch calls. The optimized stateless and transactional supports have to be built on top of the first two steps.

Stage Stage nameGoalDependency
Individual commitCreate a generic offset commit model beyond current partition → single offset mapping.No
Key filtering based fetchAdd capability to FetchRequest with specific hashed key range or specific keysNo
Rebalance support for concurrent assignment Add assignor support to allow splitting single topic partition with different hash rangeKey based filtering
Transactional supportIncorporate individual commit into the transaction processing modelKey based filtering
Support cooperative fetch on brokerRound robin assign data to cooperative consumer fetches when there is no key-range specified Rebalance support

...

In the individual commit mode, the offset metadata shall grow much quicker and harder to predict. The log needs to be highly compacted to avoid disk waste, which is different requirement from the existing consumer offset topic. Thus, we propose to add another internal topic called `__individual_commit_offsets` which stores the individual commits specifically, and call the current __consumed_offsets topic the primary offset topic. Furthermore, this isolation should make the IC feature rollout more controllable by avoiding messing up stable offset in primary and achieve at-least-once when we need to delete the corrupted IC offset topic. The individual commit offset topic shall be required to co-locate with the __consumed_offsets topic, which means it has to share the configuration of number of partitions, replication factor and min replicas as primary offset topic.

...

The IC offset will be appended into corresponding offset log as normal, however the broker shall maintain an in-memory state to keep track of the individual commits that are written but not yet cleaned up. To eventually compact this topic, each IC record will go through the coordinator memory to check whether there could be records deleted. In the illustration scenario,

  • If there is an IC [48-49] then we no longer need to maintain IC ranges [45-47] and [50-50]. We shall append 2 null records to delete [45-47] and [50-50] and augment the original IC into [45-50]
  • If there is an IC [43-44], we are good to move stable offset forward. The operation is like:
    • Push stable offset to 47 in the primary offset topic
    • Append a record to remove IC [45-47] in the IC offset topic
  • [45-50]
  • If there is an IC [43-44], we are good to move stable offset forward. The operation is like:
    • Push stable offset to 47 in the primary offset topic
    • Append a record to remove IC [45-47] in the IC offset topic

There is also a math we need to do which is how frequent we should commit. The default max request size for Kafka is 1MB, which means we could not include more than 1MB / 16b ≈ 60,000 ranges within a request. This means if we are in auto commit mode, the consumer has to breakdown commits if it already accumulates a large number of ranges. It's also undesirable to let stable offset make no progress for a long time. For the sake of smooth progress, we define a new config on the max ranges being collected before triggering a commit call.

Also we need to take care of the memory bound for maintaining the individual offsets. If there are too many of them, we eventually couldn't store all in the heap. One solution is to block progress of the active group when the number of IC offsets are too large, which usually indicates a dead member within the group. The coordinator will callout rebalance in this extreme case to make sure progress could be made againThere is also a math we need to do which is how frequent we should commit. The default max request size for Kafka is 1MB, which means we could not include more than 1MB / 16b ≈ 60,000 ranges within a request. This means if we are in auto commit mode, the consumer has to breakdown commits if it already accumulates a large number of ranges. It's also undesirable to let stable offset make no progress for a long time. For the sake of smooth progress, we define a new config on the max ranges being collected before triggering a commit call.

Offset Fetch

During a consumer startup in sequential commit mode, it will attempt to fetch the stored offsets before resuming work. This would be the same step for IC mode, only the consumer will be aware of both the individual offsets and stable offsets now. The individual offsets serve as a reference when user calls #poll(), such that if there are is any record that's already committed, it will be filtered and the polled records will only contain committed ones. When doing the offset commit, consumer will do a "gap amending" too.

...

There is obvious performance penalty compared with zero-copy mechanism for existing consumer fetching, however the value of unblocking new use cases motivates us to figure out more optimizations in the long run. In the near term, we could consider cache the filter results of one batch of data so that when other fetch requests fall into this range, we could serve in-memory result instead of doing a repetitive fetch. For example, if a fetch request coming in to ask for range [40-50], but only [40, 44] are satisfying its hash range. The result of [45, 49] will be saved as message keys,  available for next fetch request instead of evicting immediately. We also recommend a config to control how much memory we would like to spend on this feature, which is called max.bytes.key.range.hashing. much memory we would like to spend on this feature, which is called max.bytes.key.range.hashing. When the cache is sufficient with space and the Fetch request is hanging for new data, producer results could write directly into the cache and do the filtering in order to further save the disk read time.

Also user could opt to choose either load the message key or the entire message into the main memory depending on the understanding of business needs since the variation of message value could be huge. This option would be introduced as a topic level config. Either way, once a cached result is being sent out, it will be evicted immediately to leave room for the current or future key range filter results.

With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. By leveraging cloud operation technology such as Kubernetes, user could generate a set of pod configurations that defines which consumer subscribing to which topic partition range, and do easy hot swap when certain nodes go down. We shall discuss the consumer API changes to support this feature in the public interface sectionAPI changes to support this feature in the public interface section.

Thinking in even longer term, this work could be more generic for broker side filtering. Client could specify: 1. key ranges, 2. blacklist keys, 3. whitelist keys, 4. max/min record size, etc to further reduce the network cost with a penalty on throughput and latency.

Rebalance support for concurrent assignment 

...

Code Block
titleKeyRangeBasedRoundRobinAssignor
partitionsPerTopic := Map<String, Integer>
topicAllowKeyRangeFetch := Map<String, Boolean>
Subscriptions := Map<String, Subscription>
coordinatorAllowIC := Boolean
finalAssignment := Map<String, Map<String, Map<String, List<Tuple<Integer, KeyRange>>>

groupSubscriptionByTopic
Step one: group subscription by topics
Group consumers based on their topic subscription. Inside the value, we put partition count and consumer.ids inside
subscriptionByTopic := Map<String, Tuple<Integer, List<String>>>
for topic, value in subscriptionByTopic:
  	numPartitions := value.1
	consumerList := value.2
    topicFinalAssignment := Map<String, List<Tuple<Integer, KeyRange>>
	consumersAllowKeyRange := Map<String, Subscriptions> // Set of Consumers who could allow KeyRange fetch and commit
	consumersAllowKeyRange = getConsumersThatAllowKeyRange(topic) // must be a consumer who subscribes to this topic
	if numPartitions < consumersAllowKeyRange.size() and coordinatorAllowIC and topicAllowKeyRangeFetch[topic]:
	    consumerPosition := 0
	    partitionToConsumersMap := Map<Integer, List<String>>
	    currentPartition := 0
	    for c, _ in consumersAllowKeyRange:
		    partitionToConsumersMap[currentPartition].add(c); 
    		currentPartition = (currentPartition + 1) % numPartitions
	
	    for partition, assignedConsumers in partitionToConsumersMap:
		    keyRanges := List<KeyRange> 
     		keyRanges = splitRangesByAvgSize(assignedConsumers.size()) // split key ranges by Range.MAX and number of shares
            k = 0
		    for c in assignedConsumers:
       		    topicFinalAssignment[c].add(new Tuple<>(partition, keyRanges[k])
    		    k += 1	
	else:
   k = 0
		   do fornormal cround inrobin assignedConsumers:assignment
    finalAssignment[topic] = topicFinalAssignment

return 		    topicFinalAssignment[c].add(new Tuple<>(partition, keyRanges[k])
    		    k += 1	
	else:
        do normal round robin assignment
    finalAssignment[topic] = topicFinalAssignment

return finalAssignment			

...

finalAssignment			

Similarly for other assignment strategy such as range assignor, we always attempt to do a `reverse-assign` when consumers out-number any topic's total partitions.

This means for 5 consumers subscribing to 2 topics with 2 and 3 partitions each, the final assignment will look like (in partition perspective):

t1p0: c1, c2, c3
t1p1: c4, c5
-----
t2p0: c1, c2
t2p1: c3, c4
t2p2: c5


This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.

...

The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility.

Code Block
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges GenerationId MemberId SessionTimeout
  MaxWaitTime => int32
  ReplicaId => MaxWaitTimeint32
 ReplicaId MinBytes => int32
  IsolationLevel => int8
  FetchSessionId => int32
  FetchSessionEpoch => int32
  Topics => TopicName Partitions
    TopicName => String
    Partitions [Topics] [RemovedTopics] KeyRanges GenerationId MemberId SessionTimeout
  MaxWaitTime=> [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
        Partition => int32
        CurrentLeaderEpoch => int32
       ReplicaId FetchOffset => int32int64
    MinBytes => int32
  IsolationLevelStartOffset => int8int64
    FetchSessionId => int32
  FetchSessionEpochMaxBytes => int32
  TopicsRemovedTopics => TopicNameRemovedTopicName Partitions[RemovedPartition]
    TopicNameRemovedTopicName => String
    PartitionsRemovedPartition => int32
 [Partition FetchOffsetKeyRanges StartOffset=> LeaderEpoch MaxBytes]
        PartitionList<Tuple<int64, int64>> // NEW

We shall also put offset ranges as part of OffsetFetchResponse:

Code Block
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode
  ThrottleTimeMs => int32
  Topics        CurrentLeaderEpoch => int32List<OffsetFetchResponseTopic>
  ErrorCode      FetchOffset => int64
        StartOffset int16

OffsetFetchResponseTopic => int64Name Partitions
  Name      MaxBytes => int32
  RemovedTopics => RemovedTopicName [RemovedPartition]int32
    RemovedTopicName => String
Partitions    RemovedPartition => int32
  KeyRanges => List<Tuple<int64, int64>> // NEW

We shall also put offset ranges as part of OffsetFetchResponse:

Code Block
OffsetFetchResponseList<OffsetFetchResponsePartition>
  
OffsetFetchResponsePartition => ThrottleTimeMs TopicsPartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode
  ThrottleTimeMsPartitionIndex => int32
  TopicsCommittedOffset => int64
  CommittedLeaderEpoch => int32
  Metadata => List<OffsetFetchResponseTopic>string
  ErrorCode => int16
   CommittedRangeOffsets => int16

OffsetFetchResponseTopic => Name Partitions
  Name           => int32
  Partitions     => List<OffsetFetchResponsePartition>
  
OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode
  PartitionIndex => int32
  CommittedOffset => int64
  CommittedLeaderEpoch => int32
  Metadata => string
  ErrorCode => int16
  CommittedRangeOffsets => List<Tuple<int64, int64>> // NEWList<Tuple<int64, int64>> // NEW

To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}

To recognize whether a consumer is allowed to subscribe as key-share, we shall put a new boolean field in Subscription to indicate that:

Code Block
languagejava
titleSubscription.java
public Subscription {
  ...
  public boolean allowKeyShare();
}

To leverage the key based filtering in a standalone group subscription mode, we also define a new consumer API in standalone modewill also add callbacks the ConsumerRebalanceListener to populate key range assignments if necessary:

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. IfConsumerRebalanceListener {

	@Deprecated
	void onPartitionsRevoked(Collection<TopicPartition> partitions);

 	@Deprecated
    void onPartitionsAssigned(Collection<TopicPartition> partitions);

    @Deprecated
    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }
	
   /**
    * Note that if a partition maps to an empty list,
  * that means a full ownership of the partitionpartitions.
    */
	void onPartitionWithKeyRangesAssigned(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW

  void assign	void onPartitionWithKeyRangesRevoked(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW

	void onPartitionWithKeyRangesLost(Map<TopicPartition, List<Tuple<int64, int64>>>int64>> partitionWithKeyRanges); // NEW
}

To recognize whether a consumer is allowed to subscribe as key-shareFor better visibility, we shall include the key range information in StreamsMetadata:

Code Block
languagejava
titleSubscriptionStreamsMetadata.java
public Subscriptionclass OffsetAndMetadata {

  public HostInfo 
}

To leverage the key based filtering in a group subscription mode, we will also add callbacks the ConsumerRebalanceListener to populate key range assignments if necessary:

Code Block
languagejava
titleConsumer.java
public ConsumerRebalanceListener {

	@Deprecated
	void onPartitionsRevoked(Collection<TopicPartition> partitionshostInfo();
	
  public Set<String> stateStoreNames();

  @Deprecated
  public Set<TopicPartition> topicPartitions();

 	@Deprecated
 public Map<TopicPartition, List<Tuple<Long, voidLong>>> onPartitionsAssignedtopicPartitionsWithKeyRange(Collection<TopicPartition> partitions);

  public String @Deprecated
    default void onPartitionsLost(Collection<TopicPartition> partitions) {host();

  public int port();
}

For better visibility, we shall include the range offsets in the ListConsumerGroupOffsetsResult:

Code Block
languagejava
titleListConsumerGroupOffsetsResult.java
public class ListConsumerGroupOffsetsResult {
	...
    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>  onPartitionsRevoked(partitions);
    }
	
   /**
    * Note that if a partition maps to an empty list, that means a full ownership of the partitions.
    */
	void onPartitionWithKeyRangesAssigned(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRangesfuture;
	...
}

public class OffsetAndMetadata {

    public long offset();
 
	public List<Tuple<Long, Long>> rangeOffsets(); // NEW

	void onPartitionWithKeyRangesRevoked(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW

	void onPartitionWithKeyRangesLost(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW
}

...

    public String metadata();

    public Optional<Integer> leaderEpoch();
}

New Exceptions

We are going to define a series of exceptions which are thrown as 1. access control was hit, 2. inconsistency

Code Block
languagejava
titleStreamsMetadataErrors.java
public class OffsetAndMetadata {

  public HostInfo hostInfo();
	
  public Set<String> stateStoreNames();

  @Deprecated
  public Set<TopicPartition> topicPartitions();

  public Map<TopicPartition, List<Tuple<Long, Long>>> topicPartitionsWithKeyRange();

  public String host();

  public int port();
}

For better visibility, we shall include the range offsets in the ListConsumerGroupOffsetsResult:

Code Block
languagejava
titleListConsumerGroupOffsetsResult.java
public class ListConsumerGroupOffsetsResult {
	...
    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
	...
}

public class OffsetAndMetadata {

    public long offset();
 
	public List<Tuple<Long, Long>> rangeOffsets(); // NEW

    public String metadata();

    public Optional<Integer> leaderEpoch();
}
INDIVIDUAL_COMMIT_NOT_ALLOWED(88, "The broker hosting coordinator doesn't allow individual commit", IndividualCommitNotAllowedException::new);
TOPIC_RANGE_FETCH_NOT_ALLOWED(89, "The topic is not allowed to perform range fetch with", TopicRangeFetchNotAllowedException::new);
CONSUMER_RANGE_FETCH_NOT_ACCEPTED(90, "The consumer is not allowed to do the range fetch", ConsumerRangeFetchNotAccepted::new);
INDIVIDUAL_COMMIT_TOO_OLD(91, "The individual commit failed due to attempting to commit some entry behind LSO");
MAX_INDIVIDUAL_COMMIT_REACHED(92, "The maximum number of individual commits coordinator could hold reaches the cap");

New Configurations

Broker configs

accept.individual.commit

Determine whether the group coordinator allows individual commit.

Default: true

max.bytes.key.range.resultscache

The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check.

Default: 209715200 bytes

max.bytes.fetcher.filtering

The maximum memory allowed for each replica fetcher to do general broker side filtering, including the key hash processing.

Default: 104857600 bytes

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes

...

accept.range.fetch

Determine whether the topic is allowed to perform key range based fetch. The reason to set it false could be more like a user's call such that any subscribing application must obey the partition level ordering.

Default: true.false

reserve.message.value.range.fetch

Determine whether when caching last fetch request result, we should also keep the message value inside main memory for optimizationmain memory for optimization.

Default: false

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes

Consumer configs

allow.individual.commit

Determine whether this consumer will participate in a shared consumption mode with other consumers.

Default: true

max.num.ranges.to.commit

Commit the progress if accumulated offset ranges are beyond this number.

Default: false

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes

Consumer configs

...

allow.individual.commit

...

Determine whether this consumer will participate in a shared consumption mode with other consumers.

Default: true

...

Commit the progress if accumulated offset ranges are beyond this number.

Default: 10,000

10,000

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blockers for us to implement a similar feature are:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

As transactional support and cooperative fetch need more design discussion, we will delay putting on the upgrade path now.

To upgrade an existing consumer use case to allow individual commit + key based filtering + rebalance support, the user needs to:

  1. Upgrade broker to latest, by default individual commit is turned on
  2. Allow the input topic to be shared by turning on the range fetch config
  3. Upgrade consumer client to allow range sharing
  4. Rolling bounce the group, and the consumer group should be able to share consumption of the topic

Failure Recovery and Test Plan

We discuss the failure scenario alongside with the test plan to better justify the value.

  1. Individual Commit failure. The failure could result from:
    1. Broker restriction. If this is the case, the client will immediately fail with restriction exception.
    2. IC log not found or corrupted. As the _consumed_offsets topic need to co-partition with IC log, we need to load the replica from ISR to the current coordinator, and force a leader election to nominate the new log as the leader. 
    3. Commit too old: the IC offset is smaller than the current LSO. In this case, reply with the current LSO and client will adjust its position to catch up with latest.
    4. In at least once, commit in duplicate is not a serious flag. Brokers should not response a separate exception but will do the warning log for user to debug later.
  2. Range Fetch failure. The failure could result from:
    1. Topic restriction. The client will immediately crash with restriction exception.
    2. Other common fetch request errors like topic not found, shall be handled in the same way
  3. Rebalance support failure. The failure could result from:
    1. Consumer restriction: Consumer will reject an assignment with range fetch if it is not allowed to.
    2. Make sure the no assignment overlap is detected in system tests.
    3. Other common rebalance failure

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blockers for us to implement a similar feature are:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

  1. Individual Commit. Although it's not of real use with only individual commit support, Kafka users or developers could start evaluating its performance with some customized applications.
    1. broker has to be upgraded first, until then all the commit request with hash range configured shall fail.
    2. Upgrade the client version.
  2. Key based filtering. Note that if individual commit is already launched when we finished the setup for key based filtering, users could use assign mode to co-processing same partition already.
    1. upgrade broker first
    2. Upgrade the client 
  3. Rebalance support. The config `allow.individual.commit` controls whether a consumer will choose to participate in a key-sharing assignment. If not turning on, the leader consumer shall not consider making it share with others.

As transactional support and cooperative fetch need more design discussion, we will delay putting on the upgrade path now.

Test Plan

...

    1. .

Rejected Alternatives

There are a couple of alternatives to this proposal here.

...