Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Take round robin assignor as an example while merging the restriction requirements, an abstract assignor that could take partition split into consideration looks like below:


Code Block
titleKeyRangeBasedAssignorKeyRangeBasedRoundRobinAssignor
partitionsPerTopic := Map<String, Integer>
topicAllowKeyRangeFetch := Map<String, Boolean>
Subscriptions := Map<String, Subscription>
coordinatorAllowIC := Boolean
finalAssignment := Map<String, Map<String, Map<String, List<Tuple<Integer, KeyRange>>>

groupSubscriptionByTopic
Step one: group subscription by topics
Group consumers based on their topic subscription. Inside the value, we put partition count and consumer.ids inside
subscriptionByTopic := Map<String, Tuple<Integer, List<String>>>
for topic, numPartitions, List of consumers:
	if numPartitions < number of consumers and coordinatorAllowIC and topicAllowKeyRangeFetch[topic]:
		 value in subscriptionByTopic:
  	numPartitions := value.1
	consumerList := value.2
    topicFinalAssignment := Map<String, List<Tuple<Integer, KeyRange>>
	consumersAllowKeyRange := Map<String, Subscriptions> // Set of Consumers who could allow KeyRange fetch and commit
		AssignconsumersAllowKeyRange consumers in= getConsumersThatAllowKeyRange(topic) // must be a roundconsumer robinwho manner to partitions


subscribes to this topic
	if numPartitions < consumersAllowKeyRange.size() and coordinatorAllowIC and topicAllowKeyRangeFetch[topic]:
	    consumerPosition := 0
	    partitionToConsumersMap := Map<Integer, List<String>>
	    currentPartition := 0
	    for c, _ in consumersAllowKeyRange:
		    partitionToConsumersMap[currentPartition].add(c); 
    		currentPartition = (currentPartition + 1) % numPartitions
	
	    for partition, assignedConsumers in partitionToConsumersMap:
		    keyRanges := List<KeyRange> 
     		keyRanges = splitRangesByAvgSize(assignedConsumers.size()) // split key ranges by Range.MAX and number of shares
            k = 0
		    for c in assignedConsumers:
       		    topicFinalAssignment[c].add(new Tuple<>(partition, keyRanges[k])
    		    k += 1	
	else:
        do normal round robin assignment
    finalAssignment[topic] = topicFinalAssignment

return finalAssignment			

Similarly for other assignment strategy such as range assignor, we always attempt to 


This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.

...