Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagetext
C_f := (P/N)_floor, the floor capacity
C_c := (P/N)_ceil, the ceiling capacity

members := the sorted set of all consumers
partitions := the set of all partitions
unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions
unfilled_members := the set of consumers not yet at capacity, initialized to be all membersempty
max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty
member.owned_partitions := the set of previously owned partitions encoded in the Subscription

// Reassign as many previously owned partitions as possible
for member : members
		remove any partitions that are no longer in the subscription from its owned partitions
		remove all owned_partitions if the generation is old
		if member.owned_partitions.size < C_f
			assign all owned partitions to member and remove from unassigned_partitions
			add member to unfilled_members
		else if member.owned_partitions.size == C_f
			assign first C_f owned_partitions to member and remove from unassigned_partitions
			remove member from unfilled_members
		else
			assign first C_c owned_partitions to member and remove from unassigned_partitions
			remove member from unfilled_members
			add member to max_capacity_members

sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 .... (for data parallelism)
sort unfilled_members by memberId (for determinism)

// Fill remaining members up to C_f
for member : unfilled_members
	compute the remaining capacity as C = C_f - num_assigned_partitions
	pop the first C partitions from unassigned_partitions and assign to member

// Steal partitions from members with max_capacity if necessary
if we run out of partitions before getting to the end of unfilled members:
	for member : unfilled_members
		poll for first member in max_capacity_members and remove one partition
		assign this partition to the unfilled member
	
// Distribute remaining partitions, one per consumer, to fill some up to C_c if necessary
if we run out of unfilled_members before assigning all partitions:
	for partition : unassigned_partitions
		assign to next member in members that is not in max_capacity_members (then add member to max_capacity_members) 

...