Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state:  [Under Discussion] Accepted

Vote thread: here

Discussion thread: TBD here

JIRA:  

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-8019 or key = KAFKA-7149 or key = KAFKA-6145 or key = KAFKA-4696
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

...

A note on the "moving" tasks. These are very similar to standby tasks, but we propose to give them a different name to make their role clear. This way, it's clear that they don't count against the "num.standby.replicas" config, and also that they represent an ephemeral increase in cluster storage requirements.

Parameters

...

  • acceptable_recovery_lag*: A scalar integer value indicating a task lag (number of offsets to catch up) that is acceptable for immediate assignment. Defaults to 10,000 (should be well under a minute and typically a few seconds, depending on workload). Must be at least 0.
  • num_standbys: A scalar integer indicating the number of hot-standby task replicas to maintain in addition to the active processing tasks. Defaults to 0. Must be at least 0.
  • probing_rebalance_interval_ms*:  A time interval representing the minimum amount of time (in milliseconds) that the leader should wait before triggering a "probing rebalance", assuming there is no intervening natural rebalance. Note that probing rebalances are only triggered if the current assignment is not balanced within "balance_factor". Assuming KIP-429 and KAFKA-8421, default would be  Default is 10 minutes (because there would be effectively no cost for no-op rebalances); without that prior work, default would be 1 day. Must be at least 1 minute.. Must be at least 1 minute.
  • max_warmup_replicas*: A scalar integer representing the maximum number of extra  replicas to assign for the purpose of moving a task to an instance that has neither the active nor any standby replicas of that task assigned. Used to throttle how much extra broker traffic and cluster state would be used for moving tasks. Defaults to 2. Minimum is 1.

* new config

Probing Rebalances

As of this KIP, an assignment may result in an unbalanced distribution of tasks in favor of returning to processing as soon as possible, while under-loaded nodes are assigned standby tasks to warm up. Once the under-loaded instances are warm, a next rebalance would be able to produce a more balanced assignment. This proposal creates a new requirement: the ability to trigger rebalances in response to conditions other than cluster or topic changes.

...

See the "Rejected Alternatives" section for a discussion of alternatives to probing rebalances.

Assignment Algorithm

The overall balancing algorithm relies on an assignment algorithm, which is not specified in this KIP, but is left as an implementation detail. We do specify some important properties of any implementation:

...

Iterative Balancing Assignments

As mentioned in the overall description of the algorithm, the assignment algorithm creates as balanced an assignment as possible while strictly assigning stateful tasks only among the most-caught-up

...

instances

...

. It also plans task movements to gradually improve assignment balance over time, stopping when it is able to create a balanced assignment.

As described above, to move tasks to fresh instances while still respecting

...

num.standby.replicas

...

, the assignor will assign "extra" standby replicas to the destination instances. Once these extra replicas are caught up, a subsequent assignment will make that the active (or a permanent standby) and unassign one of the prior active or standby tasks.

A consequence of this design is that the overall amount of state in the cluster will exceed the sum of active and configured standby tasks while tasks are being moved. Also, each "moving" task reads from the broker, resulting in increased broker load and network traffic. To mitigate these effects, we'll introduce the max_task_migrations configuration, which limits the number of extra tasks that can be assigned at a time (inspired by Elasticsearch). The default is set to 2, but can be decreased to 1 or increased arbitrarily.

The default setting is likely to result in needing multiple balance improvements during probing rebalances over time, but the tradeoff is less load on the cluster and the broker. At the other end of the spectrum, setting the number high would allow the algorithm to assign all the desired movements in one shot, resulting in the whole process needing just two rebalances: the initial one to assign the moving tasks, and the final one to realize the movements and drop the old, unbalanced tasks.

Assignment Algorithm

The overall balancing algorithm relies on an assignment algorithm, which is not specified in this KIP, but is left as an implementation detail. We do specify some important properties of any implementation:

  1. The assignment algorithm is required  to assign active stateful tasks to the most-caught-up-instances (see below for a definition of "most caught-up"). I.e., active stateful tasks have the highest assignment priority.
  2. As a second priority, the algorithm must assign standby tasks to the next-most-caught-up instances. All computed assignments must have at least "num.standby.replicas" number of standby replicas for every task.
  3. The assignment algorithm may assign extra standby tasks to warm up instances that it wants to move existing active or standby tasks to.
  4. Of course, the algorithm must also assign stateless tasks.
  5. The algorithm must converge: if the current assignment is already balanced, it must not alter the assignment.
  6. The algorithm should produce stable assignments. E.g., it should try not to shuffle stateless tasks randomly among nodes. Depending on the implementation, this may not be easy to guarantee, but it would improve overall performance to provide stability if possible. Note that the convergence requirement (#5) at least guarantees that once the assignment is balanced, it won't change anymore at all.

Computing the most-caught-up instances.

The assignment algorithm is required to assign active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. To do this, we need to define what a "most-caught-up" task is.

First, we'll sort all the tasks and instances, to give the algorithms a stable starting ordering. Then we compute a "rank" for each instance on each task. The "rank" represents how far from "caught up" that instance is on the task. Lower is more caught up. Rank has a floor of acceptable_recovery_lag . That is, all instances that have a lag under acceptable_recovery_lag are considered to have a lag of 0. This allows instances that are "nearly caught up" to be considered for active assignment.

In the case where an instance has no state for a task, we just use the total number of offsets in that task's changelog as the lag. If this isn't convenient, we can also use "max long" as a proxy.

Something like this:

Code Block
SortedTasks: List<task> := sort(Tasks) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(Instances) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else:
      // then the rank is the actual lag
      StatefulTasksToRankedCandidates[task][instance[task].lag] := instance

For a given task, all the instances with the minimum rank are "most-caught-up".

Defining "balance"

An assignment is "balanced" if it displays both:

  • instance balance: The instance with the most tasks has no more than 1 more task than the instance with the least tasks.
  • subtopology balance: The subtopology with the most instances executing its tasks has no more than 1 more instance executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.

Rebalance Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

Today, Streams uses the  StreamsPartitionAssignor to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.

More specifically on subscription:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 6) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


To support the proposed algorithms, we're proposing a new, version 7, format for SubsriptionInfo:

Code Block
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID TaskLags EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   EndPoint                => HostInfo
   Task Lags               => Map<TaskId, Int64>    // new change

The new field, TaskLags would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks

We do not need any modifications to AssignmentInfo at this time.

State with logging disabled

There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.

Rather than building this special case into the code, we can simply have the members not add any lag information for non-logged stores. If a task has both logged and non-logged stores, we'd wind up with a task lag that represents the sum of the lags on the logged stores only. If the task has only non-logged stores, the the task would be completely absent from the "task lags" map.

On the leader/assignment side, following the algorithm above for computing `StatefulTasksToRankedCandidates` (to determine the degree of caught-up-ness for each instance on each stateful task), we would fill in a lag of "number of offsets in the changelog" for any instance that doesn't report a lag for a stateful task. For each store in the stateful task that doesn't have a changelog, we would consider its "number of offsets" to be zero. This has the effect of presenting all instances as equal candidates to receive stateful but non-logged tasks.

Stateless tasks

Similar to stateful non-logged tasks, stateless tasks have no notion of "catching up", and therefore there's no concept of a "lag", and we can consider all instances in the cluster to be equal candidates to execute a stateless task.

Again similarly, there's no need to report a lag at all for these tasks. Stateless tasks are not represented at all in `StatefulTasksToRankedCandidates`, so there's no need for special handling on the assignment side, and the assignor just packs the stateless tasks into the cluster to achieve the best balance it can with respect to the assignments of stateful tasks (which are specified to have higher assignment priority)

Computing the most-caught-up instances.

The assignment algorithm is required to assign active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. To do this, we need to define what a "most-caught-up" task is.

First, we'll sort all the tasks and instances, to give the algorithms a stable starting ordering. Then we compute a "rank" for each instance on each task. The "rank" represents how far from "caught up" that instance is on the task. Lower is more caught up. Rank has a floor of acceptable_recovery_lag . That is, all instances that have a lag under acceptable_recovery_lag are considered to have a lag of 0. This allows instances that are "nearly caught up" to be considered for active assignment.

In the case where an instance has no state for a task, we just use the total number of offsets in that task's changelog as the lag. If this isn't convenient, we can also use "max long" as a proxy.

Something like this:

Code Block
SortedTasks: List<task> := sort(Tasks) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(Instances) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else:
      // then the rank is the actual lag
      StatefulTasksToRankedCandidates[task][instance[task].lag] := instance

For a given task, all the instances with the minimum rank are "most-caught-up".

Defining "balance"

An assignment is "balanced" if it displays both:

  • instance balance: The instance with the most tasks has no more than "balance_factor" more tasks than the instance with the least tasks.
  • subtopology balance: The subtopology with the most instances executing its tasks has no more than "balance_factor" more instances executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.

Rebalance Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

Today, Streams uses the  StreamsPartitionAssignor to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.

More specifically on subscription:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo

To support the proposed algorithms, we're proposing a new, version 5, format for SubsriptionInfo:

Code Block
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID TaskLags EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   Task Lags               => Map<TaskId, Int32>    // new change
   EndPoint                => HostInfo

The new field, TaskLags would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks

We do not need any modifications to AssignmentInfo at this time.

State with logging disabled

There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.

Rather than building this special case into the code, we can simply have the members report their lag on non-logged stores as zero. Then, the task would automatically be considered stateful for assignment, but all instances would be considered equal assignment candidates. Further, because zero is the additive identity, this strategy works seamlessly when a task has both logged and non-logged stores. I.e., the "synthetic" zero-lag for the non-logged store gets added to the non-zero lag for the logged store, and we sensibly report the total lag for the task to just be the lag on the logged store.

Race between assignment and state cleanup

...

Stateful (active) Tasks := {T1, T2, T3}
Standby Tasks Replicas := {S1, S2, S3}1
Instances := {I1, I2}

Initial State

...

Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Tasks Replicas := {S1, S2, S3, S4} 1
Instances := {I1, I2, I3}

The standby tasks in this scenario are assumed to be in sync with the active tasks, that is, caught up and within acceptable_recovery_lag.

...

Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Tasks Replicas := {S1, S2, S3, S4} 1
Instances := {I1, I2, I3}

In this scenario, the standby tasks are lagging the active tasks by some nontrivial amount, that is, by more than the acceptable_recovery_lag.

...