Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state:  [Under Discussion] Accepted

Vote thread: here

Discussion thread: TBD here

JIRA:  

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-8019 or key = KAFKA-7149 or key = KAFKA-6145 or key = KAFKA-4696
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

...

The task movements are accomplished by assigning the task as a standby task to the destination instance, and then rebalancing once those tasks are up to date. The latter rebalance would find that the optimal a more balanced solution is to make the destination instance active on that task and just unassign the source instance from that taskavailable while still only assigning work to caught-up instances.

This means we need some mechanism to trigger a rebalance outside of group or topic changes (which are the only triggers currently). There are a number of ways to do this, but we propose to add a notion of "probing rebalance", discussed in detail below

...

A note on the "moving" tasks. These are very similar to standby tasks, but we propose to give them a different name to make their role clear. This way, it's clear that they don't count against the "num.standby.replicas" config, and also that they represent an ephemeral increase in cluster storage requirements.

Parameters

  • balance_factor: A scalar integer value representing the target difference in number of tasks assigned to the node with the most tasks vs. the node with the least tasks. Defaults to 1. Must be at least 1.
  • acceptable_recovery_lag*: A scalar integer value indicating a task lag (number of offsets to catch up) that is acceptable for immediate assignment. Defaults to 10,000 (should be well under a minute and typically a few seconds, depending on workload). Must be at least 0.
  • num_standbys: A scalar integer indicating the number of hot-standby task replicas to maintain in addition to the active processing tasks. Defaults to 0. Must be at least 0.
  • probing_rebalance_interval_ms*:  A time interval representing the minimum amount of time (in milliseconds) that the leader should wait before triggering a "probing rebalance", assuming there is no intervening natural rebalance. Note that probing rebalances are only triggered if the current assignment is not balanced within "balance_factor". Assuming KIP-429 and KAFKA-8421, default would be  Default is 10 minutes (because there would be effectively no cost for no-op rebalances); without that prior work, default would be 1 day. Must be at least 1 minute.

Probing Rebalances

As of this KIP, an assignment may 

Assignment Algorithm

The overall balancing algorithm relies on an assignment algorithm, which is not specified in this KIP, but is left as an implementation detail. The assignment algorithm will be required  to assign both active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. The assignment algorithm must also assign extra standby tasks, as necessary, to "warm up" instances when assigning a task to that instance would improve the cluster's balance, and when the instance currently has no copy of that task.

Computing the most-caught-up instances.

The assignment algorithm is required to assign active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. To do this, we need to define what a "most-caught-up" task is.

First, we'll sort all the tasks and instances, to give the algorithms a stable starting ordering. Then we compute a "rank" for each instance on each task. The "rank" represents how far from "caught up" that instance is on the task. Lower is more caught up. Rank has a floor of acceptable_recovery_lag . That is, all instances that have a lag under acceptable_recovery_lag are considered to have a lag of 0. This allows instances that are "nearly caught up" to be considered for active assignment.

In the case where an instance has no state for a task, we just use the total number of offsets in that task's changelog as the lag. If this isn't convenient, we can also use "max long" as a proxy.

Something like this:

Code Block
SortedTasks: List<task> := sort(Tasks) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(Instances) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else:
      // then the rank is the actual lag
      StatefulTasksToRankedCandidates[task][instance[task].lag] := instance

For a given task, all the instances with the minimum rank are "most-caught-up".

Defining "balance"

An assignment is "balanced" if it displays both:

  • instance balance: The instance with the most tasks has no more than "balance_factor" more tasks than the instance with the least tasks.
  • subtopology balance: The subtopology with the most instances executing its tasks has no more than "balance_factor" more instances executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.

Rebalance Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

Today, Streams uses the  StreamsPartitionAssignor to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.

More specifically on subscription:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo

To support the proposed algorithms, we're proposing a new, version 5, format for SubsriptionInfo:

Code Block
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID TaskLags EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   Task Lags               => Map<TaskId, Int32>    // new change
   EndPoint                => HostInfo

The new field, TaskLags would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks

We do not need any modifications to AssignmentInfo at this time.

  • max_warmup_replicas*: A scalar integer representing the maximum number of extra  replicas to assign for the purpose of moving a task to an instance that has neither the active nor any standby replicas of that task assigned. Used to throttle how much extra broker traffic and cluster state would be used for moving tasks. Defaults to 2. Minimum is 1.

* new config

Probing Rebalances

As of this KIP, an assignment may result in an unbalanced distribution of tasks in favor of returning to processing as soon as possible, while under-loaded nodes are assigned standby tasks to warm up. Once the under-loaded instances are warm, a next rebalance would be able to produce a more balanced assignment. This proposal creates a new requirement: the ability to trigger rebalances in response to conditions other than cluster or topic changes.

Ideally, group members would be able to communicate their current status back to the group leader so that the leader could trigger rebalances when the members catch up, but there is currently no such communication channel available. In lieu of that, we propose to have the leader trigger rebalances periodically, on a configured "probing_rebalance_interval". As each member re-joins the group, they would report their current lag (as proposed below), and the leader would potentially be able to compute a more balanced assignment.

Note that "balance" in this KIP, as today, is purely a function of task distribution over the instances of the cluster. Therefore, an assignment's balance can only change in response to changes in the cluster, or changes to the topic metadata (which could change the number of tasks). Both of these conditions already trigger a rebalance, so once the leader makes a balanced assignment, it can stop performing probing rebalances, confident that any later change to the balance of the cluster would trigger a rebalance automatically.

See the "Rejected Alternatives" section for a discussion of alternatives to probing rebalances.

Iterative Balancing Assignments

As mentioned in the overall description of the algorithm, the assignment algorithm creates as balanced an assignment as possible while strictly assigning stateful tasks only among the most-caught-up instances. It also plans task movements to gradually improve assignment balance over time, stopping when it is able to create a balanced assignment.

As described above, to move tasks to fresh instances while still respecting num.standby.replicas, the assignor will assign "extra" standby replicas to the destination instances. Once these extra replicas are caught up, a subsequent assignment will make that the active (or a permanent standby) and unassign one of the prior active or standby tasks.

A consequence of this design is that the overall amount of state in the cluster will exceed the sum of active and configured standby tasks while tasks are being moved. Also, each "moving" task reads from the broker, resulting in increased broker load and network traffic. To mitigate these effects, we'll introduce the max_task_migrations configuration, which limits the number of extra tasks that can be assigned at a time (inspired by Elasticsearch). The default is set to 2, but can be decreased to 1 or increased arbitrarily.

The default setting is likely to result in needing multiple balance improvements during probing rebalances over time, but the tradeoff is less load on the cluster and the broker. At the other end of the spectrum, setting the number high would allow the algorithm to assign all the desired movements in one shot, resulting in the whole process needing just two rebalances: the initial one to assign the moving tasks, and the final one to realize the movements and drop the old, unbalanced tasks.

Assignment Algorithm

The overall balancing algorithm relies on an assignment algorithm, which is not specified in this KIP, but is left as an implementation detail. We do specify some important properties of any implementation:

  1. The assignment algorithm is required  to assign active stateful tasks to the most-caught-up-instances (see below for a definition of "most caught-up"). I.e., active stateful tasks have the highest assignment priority.
  2. As a second priority, the algorithm must assign standby tasks to the next-most-caught-up instances. All computed assignments must have at least "num.standby.replicas" number of standby replicas for every task.
  3. The assignment algorithm may assign extra standby tasks to warm up instances that it wants to move existing active or standby tasks to.
  4. Of course, the algorithm must also assign stateless tasks.
  5. The algorithm must converge: if the current assignment is already balanced, it must not alter the assignment.
  6. The algorithm should produce stable assignments. E.g., it should try not to shuffle stateless tasks randomly among nodes. Depending on the implementation, this may not be easy to guarantee, but it would improve overall performance to provide stability if possible. Note that the convergence requirement (#5) at least guarantees that once the assignment is balanced, it won't change anymore at all.

Computing the most-caught-up instances.

The assignment algorithm is required to assign active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. To do this, we need to define what a "most-caught-up" task is.

First, we'll sort all the tasks and instances, to give the algorithms a stable starting ordering. Then we compute a "rank" for each instance on each task. The "rank" represents how far from "caught up" that instance is on the task. Lower is more caught up. Rank has a floor of acceptable_recovery_lag . That is, all instances that have a lag under acceptable_recovery_lag are considered to have a lag of 0. This allows instances that are "nearly caught up" to be considered for active assignment.

In the case where an instance has no state for a task, we just use the total number of offsets in that task's changelog as the lag. If this isn't convenient, we can also use "max long" as a proxy.

Something like this:

Code Block
SortedTasks: List<task> := sort(Tasks) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(Instances) // define a stable ordering of instances

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}

// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
  for instance in SortedInstances:
    if (instance does not contain task lag):
      // then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
      StatefulTasksToRankedCandidates[task][task.offsets] := instance
    else if (instance[task].lag <= acceptable_recovery_lag:
      // then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
      StatefulTasksToRankedCandidates[task][0] := instance
    else:
      // then the rank is the actual lag
      StatefulTasksToRankedCandidates[task][instance[task].lag] := instance

For a given task, all the instances with the minimum rank are "most-caught-up".

Defining "balance"

An assignment is "balanced" if it displays both:

  • instance balance: The instance with the most tasks has no more than 1 more task than the instance with the least tasks.
  • subtopology balance: The subtopology with the most instances executing its tasks has no more than 1 more instance executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.

Rebalance Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

Today, Streams uses the  StreamsPartitionAssignor to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.

More specifically on subscription:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 6) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


To support the proposed algorithms, we're proposing a new, version 7, format for SubsriptionInfo:

Code Block
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID TaskLags EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   EndPoint                => HostInfo
   Task Lags               => Map<TaskId, Int64>    // new change

The new field, TaskLags would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks

We do not need any modifications to AssignmentInfo at this time.

State with logging disabled

There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.

Rather than building this special case into the code, we can simply have the members not add any lag information for non-logged stores. If a task has both logged and non-logged stores, we'd wind up with a task lag that represents the sum of the lags on the logged stores only. If the task has only non-logged stores, the the task would be completely absent from the "task lags" map.

On the leader/assignment side, following the algorithm above for computing `StatefulTasksToRankedCandidates` (to determine the degree of caught-up-ness for each instance on each stateful task), we would fill in a lag of "number of offsets in the changelog" for any instance that doesn't report a lag for a stateful task. For each store in the stateful task that doesn't have a changelog, we would consider its "number of offsets" to be zero. This has the effect of presenting all instances as equal candidates to receive stateful but non-logged tasks.

Stateless tasks

Similar to stateful non-logged tasks, stateless tasks have no notion of "catching up", and therefore there's no concept of a "lag", and we can consider all instances in the cluster to be equal candidates to execute a stateless task.

Again similarly, there's no need to report a lag at all for these tasks. Stateless tasks are not represented at all in `StatefulTasksToRankedCandidates`, so there's no need for special handling on the assignment side, and the assignor just packs the stateless tasks into the cluster to achieve the best balance it can with respect to the assignments of stateful tasks (which are specified to have higher assignment priority).

Race between assignment and state cleanup

Streams starts a background thread to clean up (delete) any state directories that are currently unassigned and have not been updated in `state.cleanup.delay.ms` amount of time.

This KIP introduces a race by reporting the lag on an instance's stores, not just the assigned ones. It is possible that a member will report a lag on an unassigned store, then, while the rebalance is ongoing, the state directory could be deleted (invalidating the lag). If the leader assigns the task to that instance, based on the now-stale lag information, then the member would actually have to rebuild the entire state from the start of the changelog.

Currently, this race is prevented because the state cleaner only runs if the Streams instance is not in "REBALANCING" state. However, as part of KIP-429, the REBALANCING state would be removed from Streams (as it would no longer be necessary). Thus, we need to prevent the race condition without relying on the REBALANCING state.

It is sufficient to create a mutex that the state cleaner must acquire before running (and hold for the duration of its run). If we also acquire the mutex before we report store lags (as part of the JoinGroup), and hold it until we get an assignment back (when the rebalance completes), then we can ensure the race cannot occur.

Therefore, we propose to create such a mutex, and acquire it during org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#subscriptionUserData (when we would compute the store lags), and release it in org.apache.kafka.clients.consumer.ConsumerRebalanceListener#onPartitionsAssigned (when we know that the state directory locks have already been acquired for any newly assigned tasks). Also, we will hold the lock for the entire duration of any call to `org.apache.kafka.streams.processor.internals.StateDirectory#cleanRemovedTasks(long)`.

This builds an unfortunate three-way dependency between the state cleanup, Streams's ConsumerPartitionAssignor, and Streams's ConsumerRebalanceListener. Any cleaner solution would require changing the Consumer APIs, though.

Example Scenarios

Scaling Out

Stateful (active) Tasks := {T1, T2, T3}
Standby Tasks Replicas := {S1, S2, S3}1
Instances := {I1, I2}

Initial State

...

Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Tasks Replicas := {S1, S2, S3, S4} 1
Instances := {I1, I2, I3}

The standby tasks in this scenario are assumed to be in sync with the active tasks, that is, caught up and within acceptable_recovery_lag.

...

Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Tasks Replicas := {S1, S2, S3, S4} 1
Instances := {I1, I2, I3}

In this scenario, the standby tasks are lagging the active tasks by some nontrivial amount, that is, by more than the acceptable_recovery_lag.

...