Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  balance_factor (as defined)
  acceptable_recovery_lag (as defined)
  num_standbys (as defined)
  constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
    a function to produce a stable balanced assignment, given the constraint that tasks can only be assigned to instance of the min rank
  proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
    a function to produce a stable balanced assignment, regardless of instance ranks. Does not try to improve on balance_factor.
  
SortedTasks: List<task> := sort(T) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(I) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := rank(T, I, acceptable_recovery_lag)
StatelessTasks: List<task> := []

// add all stateless tasks to StatelessTasks
for task in SortedTasks if task is stateless:
  add task to StatelessTasks


// assign active tasks to instances of the min rank, using the existing algorithm from the Consumer StickyAssignor
StatefulActiveTaskAssignments: Map<instance, List<Task>> := constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)

// at this point, we have an active task assignment that is as balanced as it can be, while only assigning the tasks to instances that are already caught up

// create a standby task for each task we want to move to another instance
StandbyTaskAssignments: Map<instance, List<task>> := {}
StandbyTasksToInstanceCount: Map<task, integer> := {} // bookkeeping the number of standbys we've assigned

ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
Movement: <task, source instance, destination instance>
ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments)

for (task, _, destination instance) in ProposedMovements:
  add task to StandbyTaskAssignments[destination instance]
  StandbyTasksToInstanceCount[task] := 1 // each task in ProposedMovements is unique, so the assignment count is at most 1 at this stage

for task,RankedCandidates in StatefulTasksToRankedCandidates:
  if (StandbyTasksToInstanceCount[task] < num_standbys):
    instance := choose a least-loaded instance that doesn't already have the task
    add task to StandbyTaskAssignments[instance]
    increment StandbyTasksToInstanceCount[task]

// now, we have an assignment of all active tasks and all standby tasks
// note that in the special case where num_standbys is 0, we will still assign a standby task to accomplish a proposed movement

// finally, proceed to pack stateless tasks onto the instances
StatelessTaskAssignments: Map<instance, List<task>> := {}

ComputeTasks: Map<instance, integer> := {}

for instance,Tasks in StatefulActiveTaskAssignments:
  ComputeTasks[instance] := |Tasks|

for task in StatelessTasks:
  instance := pick an instance with the least number of compute tasks
  add task to StatelessTaskAssignments[instance]
  increment ComputeTasks[instance]

// We are done.
// The StatefulActiveTaskAssignments are as balanced as possible while minimizing catch-up time.
// The StandbyTaskAssignments contain any movements we wish to perfectly balance the active tasks, and otherwise are as balanced as possible.
// The StatelessTasks are assigned in a way that equalizes the compute load over the cluster, given the existing active assignment
return new Assignment(StatefulActiveTaskAssignments, StandbyTaskAssignments, StatelessTaskAssignments)

...