Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The task movements are accomplished by assigning the task as a standby task to the destination instance, and then rebalancing once those tasks are up to date. The latter rebalance would find that the optimal a more balanced solution is to make the destination instance active on that task and just unassign the source instance from that taskavailable while still only assigning work to caught-up instances.

This means we need some mechanism to trigger a rebalance outside of group or topic changes (which are the only triggers currently). There are a number of ways to do this, but we propose to add a notion of "probing rebalance", discussed in detail below

...

As of this KIP, an assignment may 

Assignment Algorithm

The overall balancing algorithm relies on an assignment algorithm, which is not specified in this KIP, but is left as an implementation detail. The assignment algorithm will be required  to assign both active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. The assignment algorithm must also assign extra standby tasks, as necessary, to "warm up" instances when assigning a task to that instance would improve the cluster's balance, and when the instance currently has no copy of that task.

Computing the most-caught-up instances.

The assignment algorithm is required to assign active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. To do this, we need to define what a "most-caught-up" task is.

First, we'll sort all the tasks and instances, to give the algorithms a stable starting ordering. Then we compute a "rank" for each instance on each task. The "rank" represents how far from "caught up" that instance is on the task. Lower is more caught up. Rank has a floor of acceptable_recovery_lag . That is, all instances that have a lag under acceptable_recovery_lag are considered to have a lag of 0. This allows instances that are "nearly caught up" to be considered for active assignment.

In the case where an instance has no state for a task, we just use the total number of offsets in that task's changelog as the lag. If this isn't convenient, we can also use "max long" as a proxy.

Something like this:

may result in an unbalanced distribution of tasks in favor of returning to processing as soon as possible, while under-loaded nodes are assigned standby tasks to warm up. Once the under-loaded instances are warm, a next rebalance would be able to produce a more balanced assignment. This proposal creates a new requirement: the ability to trigger rebalances in response to conditions other than cluster or topic changes.

Ideally, group members would be able to communicate their current status back to the group leader so that the leader could trigger rebalances when the members catch up, but there is currently no such communication channel available. In lieu of that, we propose to have the leader trigger rebalances periodically, on a configured "probing_rebalance_interval". As each member re-joins the group, they would report their current lag (as proposed below), and the leader would potentially be able to compute a more balanced assignment.

Note that "balance" in this KIP, as today, is purely a function of task distribution over the instances of the cluster. Therefore, an assignment's balance can only change in response to changes in the cluster, or changes to the topic metadata (which could change the number of tasks). Both of these conditions already trigger a rebalance, so once the leader makes a balanced assignment, it can stop performing probing rebalances, confident that any later change to the balance of the cluster would trigger a rebalance automatically.

See the "Rejected Alternatives" section for a discussion of alternatives to probing rebalances.

Assignment Algorithm

The overall balancing algorithm relies on an assignment algorithm, which is not specified in this KIP, but is left as an implementation detail. We do specify some important properties of any implementation:

  • The assignment algorithm is required  to assign active stateful tasks to the most-caught-up-instances (see below for a definition of "most caught-up"). I.e., active stateful tasks have the highest assignment priority.
  • As a second priority, the algorithm must assign standby tasks to the next-most-caught-up instances. All computed assignments must have at least "num.standby.replicas" number of standby replicas for every task.
  • The assignment algorithm may assign extra standby tasks to warm up instances that it wants to move existing active or standby tasks to.
  • Of course, the algorithm must also assign stateless tasks.
  • The algorithm must stabilize: if the current assignment is already balanced, it must not alter the assignment.

Computing the most-caught-up instances.

The assignment algorithm is required to assign active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. To do this, we need to define what a "most-caught-up" task is.

First, we'll sort all the tasks and instances, to give the algorithms a stable starting ordering. Then we compute a "rank" for each instance on each task. The "rank" represents how far from "caught up" that instance is on the task. Lower is more caught up. Rank has a floor of acceptable_recovery_lag . That is, all instances that have a lag under acceptable_recovery_lag are considered to have a lag of 0. This allows instances that are "nearly caught up" to be considered for active assignment.

In the case where an instance has no state for a task, we just use the total number of offsets in that task's changelog as the lag. If this isn't convenient, we can also use "max long" as a proxy.

Something like this:

Code Block
SortedTasks: List<task> := sort(Tasks) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(Instances) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else
Code Block
SortedTasks: List<task> := sort(Tasks) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(Instances) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else:
      // then the rank is the actual lag
      StatefulTasksToRankedCandidates[task][instance[task].lag] := instance

For a given task, all the instances with the minimum rank are "most-caught-up".

Defining "balance"

An assignment is "balanced" if it displays both:

...

instance[task].lag] := instance

For a given task, all the instances with the minimum rank are "most-caught-up".

Defining "balance"

An assignment is "balanced" if it displays both:

  • instance balance: The instance with the most tasks has no more than "balance_factor" more tasks than the instance with the least tasks.
  • subtopology balance: The subtopology with the most instances executing its tasks has no more than "balance_factor" more instances executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.

State with logging disabled

There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.

Rather than building this special case into the code, we can simply have the members report their lag on non-logged stores as zero. Then, the task would automatically be considered stateful for assignment, but all instances would be considered equal assignment candidates. Further, because zero is the additive identity, this strategy works seamlessly when a task has both logged and non-logged stores. I.e., the "synthetic" zero-lag for the non-logged store gets added to the non-zero lag for the logged store, and we sensibly report the total lag for the task to just be the lag on the logged store.

Rebalance Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

...