THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
given:
T list of tasks to be assigned
(includes t.offsets, the total number of offsets in task t)
I list of instances to assign tasks to
(includes i[t].lag, the reported lag of instance i on task t)
balance_factor (as defined)
acceptable_recovery_lag (as defined)
num_standbys (as defined)
constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
a function to produce a stable balanced assignment, given the constraint that tasks can only be assigned to instance of the min rank
proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
a function to produce a stable balanced assignment, regardless of instance ranks. Does not try to improve on balance_factor.
SortedTasks: List<task> := sort(T) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(I) // define a stable ordering of instances
StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := rank(T, I, acceptable_recovery_lag)
StatelessTasks: List<task> := []
// add all stateless tasks to StatelessTasks
for task in SortedTasks if task is stateless:
add task to StatelessTasks
// assign active tasks to instances of the min rank, using the existing algorithm from the Consumer StickyAssignor
StatefulActiveTaskAssignments: Map<instance, List<Task>> := constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
// at this point, we have an active task assignment that is as balanced as it can be, while only assigning the tasks to instances that are already caught up
// create a standby task for each task we want to move to another instance
StandbyTaskAssignments: Map<instance, List<task>> := {}
StandbyTasksToInstanceCount: Map<task, integer> := {} // bookkeeping the number of standbys we've assigned
ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
Movement: <task, source instance, destination instance>
ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments)
for (task, _, destination instance) in ProposedMovements:
add task to StandbyTaskAssignments[destination instance]
StandbyTasksToInstanceCount[task] := 1 // each task in ProposedMovements is unique, so the assignment count is at most 1 at this stage
for task,RankedCandidates in StatefulTasksToRankedCandidates:
if (StandbyTasksToInstanceCount[task] < num_standbys):
instance := choose a least-loaded instance that doesn't already have the task
add task to StandbyTaskAssignments[instance]
increment StandbyTasksToInstanceCount[task]
// now, we have an assignment of all active tasks and all standby tasks
// note that in the special case where num_standbys is 0, we will still assign a standby task to accomplish a proposed movement
// finally, proceed to pack stateless tasks onto the instances
StatelessTaskAssignments: Map<instance, List<task>> := {}
ComputeTasks: Map<instance, integer> := {}
for instance,Tasks in StatefulActiveTaskAssignments:
ComputeTasks[instance] := |Tasks|
for task in StatelessTasks:
instance := pick an instance with the least number of compute tasks
add task to StatelessTaskAssignments[instance]
increment ComputeTasks[instance]
// We are done.
// The StatefulActiveTaskAssignments are as balanced as possible while minimizing catch-up time.
// The StandbyTaskAssignments contain any movements we wish to perfectly balance the active tasks, and otherwise are as balanced as possible.
// The StatelessTasks are assigned in a way that equalizes the compute load over the cluster, given the existing active assignment
return new Assignment(StatefulActiveTaskAssignments, StandbyTaskAssignments, StatelessTaskAssignments) |
...