Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • balance_factor: A scalar integer value representing the target difference in number of tasks assigned to the node with the most tasks vs. the node with the least tasks. Defaults to 1. Must be at least 1.
  • acceptable_recovery_lag: A scalar integer value indicating a task lag (number of offsets to catch up) that is acceptable for immediate assignment. Defaults to 10,000 (should be well under a minute and typically a few seconds, depending on workload). Must be at least 0.
  • num_standbys: A scalar integer indicating the number of hot-standby task replicas to maintain in addition to the active processing tasks. Defaults to 0. Must be at least 0.

Ranking Algorithm

An algorithm for ranking each instance with respect to its up-to-date-ness on each task. Used by the other algorithms here.

Code Block
rank:

given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  acceptable_recovery_lag (as defined)

SortedTasks: List<task> := sort(T) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(I) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else:
      // then the rank is the actual lag
      StatefulTasksToRankedCandidates[task][instance[task].lag] := instance

return StatefulTasksToRankedCandidates


Assignment Algorithm

This algorithm would be used to generate an assignment for the cluster upon ConsumerGroup rebalance. It assumes the existence of two non-trivial functions (constrainedBalancedAssignment and proposeBalancedAssignment), which are not detailed here. The behavior of these functions is described in the "given" block. The related work contains several suitable implementations, so we're confident one is possible, and the exact implementation isn't important, as long as it satisfies the requires behavior.

Code Block
given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  balance_factor (as defined)
  acceptable_recovery_lag (as defined)
  num_standbys (as defined)
  constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
    a function to produce a stable balanced assignment, given the constraint that tasks can only be assigned to instance of the min rank
  proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
    a function to produce a stable balanced assignment, regardless of instance ranks. Does not try to improve on balance_factor.
  
SortedTasks: List<task> := sort(T) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(I) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := rank(T, I, acceptable_recovery_lag)
StatelessTasks: List<task> := []

// add all stateless tasks to StatelessTasks
for task in SortedTasks if task is stateless:
  add task to StatelessTasks


// assign active tasks to instances of the min rank, using the existing algorithm from the Consumer StickyAssignor
StatefulActiveTaskAssignments: Map<instance, List<Task>> := constrainedBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)

// at this point, we have an active task assignment that is as balanced as it can be, while only assigning the tasks to instances that are already caught up

// create a standby task for each task we want to move to another instance
StandbyTaskAssignments: Map<instance, List<task>> := {}
StandbyTasksToInstanceCount: Map<task, integer> := {} // bookkeeping the number of standbys we've assigned

ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
Movement: <task, source instance, destination instance>
ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments)

for (task, _, destination instance) in ProposedMovements:
  add task to StandbyTaskAssignments[destination instance]
  StandbyTasksToInstanceCount[task] := 1 // each task in ProposedMovements is unique, so the assignment count is at most 1 at this stage

for task,RankedCandidates in StatefulTasksToRankedCandidates:
  if (StandbyTasksToInstanceCount[task] < num_standbys):
    instance := choose a least-loaded instance that doesn't already have the task
    add task to StandbyTaskAssignments[instance]
    increment StandbyTasksToInstanceCount[task]

// now, we have an assignment of all active tasks and all standby tasks
// note that in the special case where num_standbys is 0, we will still assign a standby task to accomplish a proposed movement

// finally, proceed to pack stateless tasks onto the instances
StatelessTaskAssignments: Map<instance, List<task>> := {}

ComputeTasks: Map<instance, integer> := {}

for instance,Tasks in StatefulActiveTaskAssignments:
  ComputeTasks[instance] := |Tasks|

for task in StatelessTasks:
  instance := pick an instance with the least number of compute tasks
  add task to StatelessTaskAssignments[instance]
  increment ComputeTasks[instance]

// We are done.
// The StatefulActiveTaskAssignments are as balanced as possible while minimizing catch-up time.
// The StandbyTaskAssignments contain any movements we wish to perfectly balance the active tasks, and otherwise are as balanced as possible.
// The StatelessTasks are assigned in a way that equalizes the compute load over the cluster, given the existing active assignment
return new Assignment(StatefulActiveTaskAssignments, StandbyTaskAssignments, StatelessTaskAssignments)


Balance Evaluation Algorithm

Used to trigger a rebalance when the currently reported task lags indicate that the cluster balance can be improved

Code Block
given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  Assignment
    StatefulActiveTaskAssignments: Map<instance, List<Task>>
    StandbyTaskAssignments: Map<instance, List<task>>
    StatelessTaskAssignments: Map<instance, List<task>>
  balance_factor (as defined)
  acceptable_recovery_lag (as defined)
  num_standbys (as defined)

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := rank(T, I, acceptable_recovery_lag)

ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
Movement: <task, source instance, destination instance>
ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments)

AvailableMovements: List<Movement> := []

for movement=(task, _, destination instance) in ProposedMovements:
  if StatefulTasksToRankedCandidates[task][0] contains destination instance:
    add movement to AvailableMovements

// if there are any movements that are now available, given the current active task assignment, then just rebalance
return AvailableMovements > 0

Rebalance Metadata

tbd

Heartbeat Metadata

tbd

Streams Rebalance Metadata: Remember the PrevTasks

...

Code Block
languagetext
titleLeader crash before
Cluster has 3 stream threads S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]

#First Rebalance 
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader, while T1 is not assigned now
S2 ~ S4 join with following status
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments: (no learner assignment since S2 doesn't know S4 is new member)
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [T1], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [])

Now the group reaches balance, although the eventual load is skewed.

Assignment Algorithm

The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". Next, we will present a holistic view of the new learner assignment algorithm during each actual rebalance.

...