THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
C_f := (P/N)_floor, the floor capacity
C_c := (P/N)_ceil, the ceiling capacity
members := the sorted set of all consumers
partitions := the set of all partitions
unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions
unfilled_members := the set of consumers not yet at capacity, initialized to be all members
max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty
member.owned_partitions := the set of previously owned partitions encoded in the Subscription
// Reassign as many previously owned partitions as possible
for member : members
remove any partitions that are no longer in the subscription from its owned partitions
remove all owned_partitions if the generation is unknown (indicates
if member.owned_partitions.size < C_f
assign all owned partitions to member and remove from unassigned_partitions
else if member.owned_partitions.size == C_f
assign first C_f owned_partitions to member and remove from unassigned_partitions
remove member from unfilled_members
else
assign first C_c owned_partitions to member and remove from unassigned_partitions
remove member from unfilled_members
add member to max_capacity_members
sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 .... (for data parallelism)
sort unfilled_members by memberId (for determinism)
// Fill remaining members up to C_f
for member : unfilled_members
compute the remaining capacity as C = C_f - num_assigned_partitions
pop the first C partitions from unassigned_partitions and assign to member
// Steal partitions from members with max_capacity if necessary
if we run out of partitions before getting to the end of unfilled members:
for member : unfilled_members
poll for first member in max_capacity_members and remove one partition
assign this partition to the unfilled member
// Distribute remaining partitions, one per consumer, to fill some up to C_c if necessary
if we run out of unfilled_members before assigning all partitions:
for partition : unassigned_partitions
assign to next member in members that is not in max_capacity_members (then add member to max_capacity_members) |
...