...
Rebalance support for concurrent assignment
Multiple restrictions
To satisfy different administration requirements to turn off this feature at any time, we are placing several configurations on:
- Broker level, to determine whether to allow IC request,
- Topic level, to determine whether this topic is allowed to be cooperatively processed
- Consumer level, to determine if this consumer opts to share processing with someone else
Abstract Assignor
To determine the hash range for every consumer generation, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers that are opting into the key sharing mode. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of consumers. If 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:
...
Code Block | ||||
---|---|---|---|---|
| ||||
public ConsumerPartitionAssignor {
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions)
} |
Take round robin assignor for as an example ,
Restriction Enforcement
To satisfy different administration requirements, we are placing several configurations on:
- Broker level, to determine whether to allow IC request,
- Topic level, to determine whether this topic is allowed to be cooperatively processed
- Consumer level, to determine if this consumer opts to share processing with someone else
...
while merging the restriction requirements, an abstract assignor that could take partition split into consideration looks like below:
Code Block | ||
---|---|---|
| ||
inputTopicPartitions := Set<TopicPartition>
brokerAllowIC := Boolean |
This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.
...