Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

At a high level, we are proposing an iterative algorithm that prioritizes recovery time, while planning task movements to improve balance over consecutive rebalances. This proposal builds on KIP-429, so we assume the overhead of rebalancing is much less than it is today.

In each rebalance, the leader assigns active tasks only to instances that are ready to begin processing them immediately. If there is no such instance, then it assigns the active task to the instance that is closest to catching up.

The task movements are accomplished by assigning the task as a standby task to the destination instance, and then rebalancing once those standbys are up to date. This means we need some mechanism to trigger a rebalance outside of group or topic changes (which are the only triggers currently).

There are a number of ways to do this, but we propose to build a UserData field into the heartbeat protocol, so that group members can continuously send their standby progress to the consumer leader. The leader will re-evaluate the assignment balance on each such metadata response, triggering a rebalance when a better balance becomes available.One advantage of this approach is that the balancing algorithm is pluggable; over time, we can add more detailed information to the heartbeat user data to allow rebalancing in response to CPU/Memory/Disk pressure, etc., instead of purely the number of tasksadd a notion of "probing rebalance", in which the consumer leader would trigger a rebalance after a configured interval. If there's any "normal rebalance" resulting from cluster or topic changes, then the probing rebalance timer is re-set. Otherwise, once the time elapses, the leader triggers a rebalance to get all the members to report their current lag on each state store. It would then use this information to potentially produce a more balanced assignment while still only assigning active tasks to instances that are ready to begin processing immediately.

Parameters

  • balance_factor: A scalar integer value representing the target difference in number of tasks assigned to the node with the most tasks vs. the node with the least tasks. Defaults to 1. Must be at least 1.
  • acceptable_recovery_lag: A scalar integer value indicating a task lag (number of offsets to catch up) that is acceptable for immediate assignment. Defaults to 10,000 (should be well under a minute and typically a few seconds, depending on workload). Must be at least 0.
  • num_standbys: A scalar integer indicating the number of hot-standby task replicas to maintain in addition to the active processing tasks. Defaults to 0. Must be at least 0.

Ranking Algorithm

An algorithm for ranking each instance with respect to its up-to-date-ness on each task. Used by the other algorithms here.

  • probing_rebalance_interval:  A time interval representing the minimum amount of time the leader should wait before triggering a "probing rebalance", assuming there is no intervening natural rebalance. Assuming KIP-429, default would be 1 hour; without KIP-429, default would be 1 day. Must be at least 1 minute.

Ranking Algorithm

An algorithm for ranking each instance with respect to its up-to-date-ness on each task. Used by the main assignment algorithm.

Code Block
rank:

given:
  T list of 
Code Block
rank:

given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  acceptable_recovery_lag (as defined)

SortedTasks: List<task> := sort(T) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(I) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else:
      // then the rank is the actual lag
      StatefulTasksToRankedCandidates[task][instance[task].lag] := instance

return StatefulTasksToRankedCandidates

...

This algorithm would be used to generate an assignment for the cluster upon ConsumerGroup rebalance. It assumes the existence of two non-trivial functions (constrainedBalancedAssignment and proposeBalancedAssignment), which are not detailed here. The behavior of these functions is described in the "given" block. The related work contains several suitable implementations, so we're confident one is possible, and the exact implementation isn't important, as long as it satisfies the requires behaviorrequired contract.

Code Block
given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  balance_factor (as defined)
  acceptable_recovery_lag (as defined)
  num_standbys (as defined)
  constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
    a function to produce a stable balanced assignment, given the constraint that tasks can only be assigned to instance of the min rank
  proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
    a function to produce a stable balanced assignment, regardless of instance ranks. Does not try to improve on balance_factor.

priorAssignment := record the existing assignment for later comparisons

SortedTasks: List<task> := sort(T) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(I) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := rank(T, I, acceptable_recovery_lag)
StatelessTasks: List<task> := []

// add all stateless tasks to StatelessTasks
for task in SortedTasks if task is stateless:
  add task to StatelessTasks


// assign active tasks to instances of the min rank
StatefulActiveTaskAssignments: Map<instance, List<Task>> := constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)

// at this point, we have an active task assignment that is as balanced as it can be, while only assigning the tasks to instances that are already caught up

// create a standby task for each task we want to move to another instance
StandbyTaskAssignments: Map<instance, List<task>> := {}
StandbyTasksToInstanceCount: Map<task, integer> := {} // bookkeeping the number of standbys we've assigned

ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
Movement: <task, source instance, destination instance>
ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments)

for (task, _, destination instance) in ProposedMovements:
  add task to StandbyTaskAssignments[destination instance]
  StandbyTasksToInstanceCount[task] := 1 // each task in ProposedMovements is unique, so the assignment count is at most 1 at this stage

for task,RankedCandidates in StatefulTasksToRankedCandidates:
  if (StandbyTasksToInstanceCount[task] < num_standbys):
    instance := choose a least-loaded instance that doesn't already have the task
    add task to StandbyTaskAssignments[instance]
    increment StandbyTasksToInstanceCount[task]

// now, we have an assignment of all active tasks and all standby tasks
// note that in the special case where num_standbys is 0, we will still assign a standby task to accomplish a proposed movement

// finally, proceed to pack stateless tasks onto the instances
StatelessTaskAssignments: Map<instance, List<task>> := {}

ComputeTasks: Map<instance, integer> := {}

for instance,Tasks in StatefulActiveTaskAssignments:
  ComputeTasks[instance] := |Tasks|

for task in StatelessTasks:
  instance := pick an instance with the least number of compute tasks
  add task to StatelessTaskAssignments[instance]
  increment ComputeTasks[instance]

// We are done.
// The StatefulActiveTaskAssignments are as balanced as possible while minimizing catch-up time.
// The StandbyTaskAssignments contain any movements we wish to perfectly balance the active tasks, and otherwise are as balanced as possible.
// The StatelessTasks are assigned in a way that equalizes the compute load over the cluster, given the existing active assignment
returnassignment := new Assignment(StatefulActiveTaskAssignments, StandbyTaskAssignments, StatelessTaskAssignments)

Balance Evaluation Algorithm

Used to trigger a rebalance when the currently reported task lags indicate that the cluster balance can be improved

Code Block
given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  Assignment
    StatefulActiveTaskAssignments: Map<instance, List<Task>>
    StandbyTaskAssignments: Map<instance, List<task>>
    StatelessTaskAssignments: Map<instance, List<task>>
  balance_factor (as defined)
  acceptable_recovery_lag (as defined)
  num_standbys (as defined)

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := rank(T, I, acceptable_recovery_lag)

ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
Movement: <task, source instance, destination instance>
ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments)

AvailableMovements: List<Movement> := []

for movement=(task, _, destination instance) in ProposedMovements:
  if StatefulTasksToRankedCandidates[task][0] contains destination instance:
    add movement to AvailableMovements

// if there are any movements that are now available, given the current active task assignment, then just rebalance
return AvailableMovements > 0

// To avoid thrashing, we will only use the new assignment if it improves the balance over the old assignment
if assignment is more balanced than priorAssignment:
  return assignment
else:
  return priorAssignment


Rebalance Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

...