Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Rebalance support for concurrent assignment 

Multiple restrictions

To satisfy different administration requirements to turn off this feature at any time, we are placing several configurations on:

  1. Broker level, to determine whether to allow IC request,
  2. Topic level, to determine whether this topic is allowed to be cooperatively processed 
  3. Consumer level, to determine if this consumer opts to share processing with someone else

Abstract Assignor

To determine the hash range for every consumer generation, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers that are opting into the key sharing mode. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of consumers. If 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:

...

Code Block
languagejava
titleConsumerPartitionAssignor.java
public ConsumerPartitionAssignor {
  public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions)
}

Take round robin assignor for as an example

Restriction Enforcement

To satisfy different administration requirements, we are placing several configurations on:

  1. Broker level, to determine whether to allow IC request,
  2. Topic level, to determine whether this topic is allowed to be cooperatively processed 
  3. Consumer level, to determine if this consumer opts to share processing with someone else

...

while merging the restriction requirements, an abstract assignor that could take partition split into consideration looks like below:


Code Block
titleKeyRangeBasedAssignor
inputTopicPartitions := Set<TopicPartition>
brokerAllowIC := Boolean


This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.

...