THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
given: T list of tasks to be assigned (includes t.offsets, the total number of offsets in task t) I list of instances to assign tasks to (includes i[t].lag, the reported lag of instance i on task t) balance_factor (as defined) acceptable_recovery_lag (as defined) num_standbys (as defined) constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor) a function to produce a stable balanced assignment, given the constraint that tasks can only be assigned to instance of the min rank proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor) a function to produce a stable balanced assignment, regardless of instance ranks. Does not try to improve on balance_factor. priorAssignment := record the existing assignment for later comparisons SortedTasks: List<task> := sort(T) // define a stable ordering of tasks SortedInstances := List<instance> := sort(I) // define a stable ordering of instances StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {} // Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better). for task in SortedTasks if task is stateful: for instance in SortedInstances: if (instance does not contain task lag): // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task. StatefulTasksToRankedCandidates[task][task.offsets] := instance else if (instance[task].lag <= acceptable_recovery_lag: // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up. StatefulTasksToRankedCandidates[task][0] := instance else: // then the rank is the actual lag StatefulTasksToRankedCandidates[task][instance[task].lag] := instance StatelessTasks: List<task> := [] // add all stateless tasks to StatelessTasks for task in SortedTasks if task is stateless: add task to StatelessTasks // assign active tasks to instances of the min rank StatefulActiveTaskAssignments: Map<instance, List<Task>> := constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor) // at this point, we have an active task assignment that is as balanced as it can be, while only assigning the tasks to instances that are already caught up // create a standby task for each task we want to move to another instance StandbyTaskAssignments: Map<instance, List<task>> := {} StandbyTasksToInstanceCount: Map<task, integer> := {} // bookkeeping the number of standbys we've assigned ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor) Movement: <task, source instance, destination instance> ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments) for (task, _, destination instance) in ProposedMovements: add task to StandbyTaskAssignments[destination instance] StandbyTasksToInstanceCount[task] := 1 // each task in ProposedMovements is unique, so the assignment count is at most 1 at this stage for task,RankedCandidates in StatefulTasksToRankedCandidates: ifwhile (StandbyTasksToInstanceCount[task] < num_standbys): instance := choose a least-loaded instance that doesn't already have the task add task to StandbyTaskAssignments[instance] increment StandbyTasksToInstanceCount[task] // now, we have an assignment of all active tasks and all standby tasks // note that in the special case where num_standbys is 0, we will still assign a standby task to accomplish a proposed movement // finally, proceed to pack stateless tasks onto the instances StatelessTaskAssignments: Map<instance, List<task>> := {} ComputeTasks: Map<instance, integer> := {} for instance,Tasks in StatefulActiveTaskAssignments: ComputeTasks[instance] := |Tasks| for task in StatelessTasks: instance := pick an instance with the least number of compute tasks add task to StatelessTaskAssignments[instance] increment ComputeTasks[instance] // We are done. // The StatefulActiveTaskAssignments are as balanced as possible while minimizing catch-up time. // The StandbyTaskAssignments contain any movements we wish to perfectly balance the active tasks, and otherwise are as balanced as possible. // The StatelessTasks are assigned in a way that equalizes the compute load over the cluster, given the existing active assignment assignment := new Assignment(StatefulActiveTaskAssignments, StandbyTaskAssignments, StatelessTaskAssignments) // To avoid thrashing, we will only use the new assignment if it improves the balance over the old assignment if assignment is more balanced than priorAssignment: return assignment else: return priorAssignment |
...