THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
C_f := (P/N)_floor, the floor capacity C_c := (P/N)_ceil, the ceiling capacity members := the sorted set of all consumers partitions := the set of all partitions unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions unfilled_members := the set of consumers not yet at capacity, initialized to be all membersempty max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty member.owned_partitions := the set of previously owned partitions encoded in the Subscription // Reassign as many previously owned partitions as possible for member : members remove any partitions that are no longer in the subscription from its owned partitions remove all owned_partitions if the generation is old if member.owned_partitions.size < C_f assign all owned partitions to member and remove from unassigned_partitions add member to unfilled_members else if member.owned_partitions.size == C_f assign first C_f owned_partitions to member and remove from unassigned_partitions remove member from unfilled_members else assign first C_c owned_partitions to member and remove from unassigned_partitions remove member from unfilled_members add member to max_capacity_members sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 .... (for data parallelism) sort unfilled_members by memberId (for determinism) // Fill remaining members up to C_f for member : unfilled_members compute the remaining capacity as C = C_f - num_assigned_partitions pop the first C partitions from unassigned_partitions and assign to member // Steal partitions from members with max_capacity if necessary if we run out of partitions before getting to the end of unfilled members: for member : unfilled_members poll for first member in max_capacity_members and remove one partition assign this partition to the unfilled member // Distribute remaining partitions, one per consumer, to fill some up to C_c if necessary if we run out of unfilled_members before assigning all partitions: for partition : unassigned_partitions assign to next member in members that is not in max_capacity_members (then add member to max_capacity_members) |
...