Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  balance_factor (as defined)
  acceptable_recovery_lag (as defined)
  num_standbys (as defined)
  constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
    a function to produce a stable balanced assignment, given the constraint that tasks can only be assigned to instance of the min rank
  proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
    a function to produce a stable balanced assignment, regardless of instance ranks. Does not try to improve on balance_factor.

priorAssignment := record the existing assignment for later comparisons

SortedTasks: List<task> := sort(T) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(I) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else:
      // then the rank is the actual lag
      StatefulTasksToRankedCandidates[task][instance[task].lag] := instance


StatelessTasks: List<task> := []

// add all stateless tasks to StatelessTasks
for task in SortedTasks if task is stateless:
  add task to StatelessTasks


// assign active tasks to instances of the min rank
StatefulActiveTaskAssignments: Map<instance, List<Task>> := constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)

// at this point, we have an active task assignment that is as balanced as it can be, while only assigning the tasks to instances that are already caught up

// create a standby task for each task we want to move to another instance
StandbyTaskAssignments: Map<instance, List<task>> := {}
StandbyTasksToInstanceCount: Map<task, integer> := {} // bookkeeping the number of standbys we've assigned

ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
Movement: <task, source instance, destination instance>
ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments)

for (task, _, destination instance) in ProposedMovements:
  add task to StandbyTaskAssignments[destination instance]
  StandbyTasksToInstanceCount[task] := 1 // each task in ProposedMovements is unique, so the assignment count is at most 1 at this stage

for task,RankedCandidates in StatefulTasksToRankedCandidates:
  ifwhile (StandbyTasksToInstanceCount[task] < num_standbys):
    instance := choose a least-loaded instance that doesn't already have the task
    add task to StandbyTaskAssignments[instance]
    increment StandbyTasksToInstanceCount[task]

// now, we have an assignment of all active tasks and all standby tasks
// note that in the special case where num_standbys is 0, we will still assign a standby task to accomplish a proposed movement

// finally, proceed to pack stateless tasks onto the instances
StatelessTaskAssignments: Map<instance, List<task>> := {}

ComputeTasks: Map<instance, integer> := {}

for instance,Tasks in StatefulActiveTaskAssignments:
  ComputeTasks[instance] := |Tasks|

for task in StatelessTasks:
  instance := pick an instance with the least number of compute tasks
  add task to StatelessTaskAssignments[instance]
  increment ComputeTasks[instance]

// We are done.
// The StatefulActiveTaskAssignments are as balanced as possible while minimizing catch-up time.
// The StandbyTaskAssignments contain any movements we wish to perfectly balance the active tasks, and otherwise are as balanced as possible.
// The StatelessTasks are assigned in a way that equalizes the compute load over the cluster, given the existing active assignment
assignment := new Assignment(StatefulActiveTaskAssignments, StandbyTaskAssignments, StatelessTaskAssignments)

// To avoid thrashing, we will only use the new assignment if it improves the balance over the old assignment
if assignment is more balanced than priorAssignment:
  return assignment
else:
  return priorAssignment

...