THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Take round robin assignor as an example while merging the restriction requirements, an abstract assignor that could take partition split into consideration looks like below:
Code Block | ||
---|---|---|
| ||
partitionsPerTopic := Map<String, Integer> topicAllowKeyRangeFetch := Map<String, Boolean> Subscriptions := Map<String, Subscription> coordinatorAllowIC := Boolean finalAssignment := Map<String, Map<String, Map<String, List<Tuple<Integer, KeyRange>>> groupSubscriptionByTopic Step one: group subscription by topics Group consumers based on their topic subscription. Inside the value, we put partition count and consumer.ids inside subscriptionByTopic := Map<String, Tuple<Integer, List<String>>> for topic, numPartitions, List of consumers: if numPartitions < number of consumers and coordinatorAllowIC and topicAllowKeyRangeFetch[topic]: value in subscriptionByTopic: numPartitions := value.1 consumerList := value.2 topicFinalAssignment := Map<String, List<Tuple<Integer, KeyRange>> consumersAllowKeyRange := Map<String, Subscriptions> // Set of Consumers who could allow KeyRange fetch and commit AssignconsumersAllowKeyRange consumers in= getConsumersThatAllowKeyRange(topic) // must be a roundconsumer robinwho manner to partitions subscribes to this topic if numPartitions < consumersAllowKeyRange.size() and coordinatorAllowIC and topicAllowKeyRangeFetch[topic]: consumerPosition := 0 partitionToConsumersMap := Map<Integer, List<String>> currentPartition := 0 for c, _ in consumersAllowKeyRange: partitionToConsumersMap[currentPartition].add(c); currentPartition = (currentPartition + 1) % numPartitions for partition, assignedConsumers in partitionToConsumersMap: keyRanges := List<KeyRange> keyRanges = splitRangesByAvgSize(assignedConsumers.size()) // split key ranges by Range.MAX and number of shares k = 0 for c in assignedConsumers: topicFinalAssignment[c].add(new Tuple<>(partition, keyRanges[k]) k += 1 else: do normal round robin assignment finalAssignment[topic] = topicFinalAssignment return finalAssignment |
Similarly for other assignment strategy such as range assignor, we always attempt to
This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.
...