You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Status

Current state[Under Discussion]

Discussion thread: TBD

JIRA:  

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is following on KIP-429 to improve Streams scaling out behavior. While KIP-429 is focused on decreasing the amount of time that progress is blocked on the rebalance itself, this KIP addresses a second phase of stoppage that happens today: upon receiving assignment of a stateful task, Streams has to catch it up to the head of its changelog before beginning to process it.

Currently, the Streams task assignment doesn't take into account how much work it will take to catch up a task, which leads to unfortunate cases in which, after a rebalance, some task can get hung up for hours while it rebuilds its state stores. The high level goal of this KIP is to allow the prior owner of that task to keep it even if the assignment is now unbalanced  until the new owner gets caught up, and then to change ownership after the catch-up phase.

In short, the goals of this KIP are:

  • Reduce unnecessary downtime due to task restoration and global application revocation.
  • Better auto scaling experience for KStream applications.
  • Stretch goal: better workload balance across KStream instances.


Background

Consumer Rebalance Protocol: Stop-The-World Effect

As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would

  1. Join group with all currently assigned tasks revoked

  2. Wait until group assignment finish to get assigned tasks and resume working

  3. Replay the assigned tasks' states

  4. Once all replay jobs finish, stream thread transits to running mode

After KIP-429, this changes to:

  1. Joined the group, optimistically keeping all assigned tasks
  2. Wait until group assignment finish to get tasks to revoke
  3. Once the revocations are processed, get assigned tasks
  4. Replay the assigned tasks' states
  5. Once all replay jobs finish, stream thread transits to running mode

If you want to know more about details on protocol level, feel free to checkout KIP-429.

Constraints and Cost Function

While doing the assignment, we observe the following constraint:

  • Only one copy of a particular task can be hosted on a particular instance. This means, a particular task, like `0_8` would have an "active" (i.e., primary) copy and some number of "standby" copies (i.e., replicas). A single instance  (not thread) can only host one of those, either the active task or one  of the standby tasks. We can't assign both an active and standby task with the same taskId to the same instance.

We also try to minimize the cost of the following cost function:

  • data parallel workload balance. We make an assumption that all the partitions of a subtopology are roughly uniform effort. Therefore, we try to spread partitions across the cluster. For example, given two subtopologies X and Y, each with three partitions 0, 1, and 2, and three nodes A, B, and C, we'd prefer to allocate the tasks like (A: <X_0, Y_0>, B: <X_1, Y_1>, C: <X_2, Y_2>) instead of (eg) (A: <X_0, X_1>, B: <X_2, Y_0>, C: <Y_1, Y_2>).
  • even distribution of work. We try to distribute tasks evenly across the cluster, instead of letting one thread have more tasks than another.
  • replay time. We try to minimize the time it takes to replay a newly assigned task, which blocks further progress on that task. Note, although this is the focus of this KIP, this consideration was already part of Streams's assignment algorithm, as it's the reason we currently favor stickiness.

Note, this is a cost minimization problem, since it may not be possible to fully satisfy all three components of the cost function.

Streams Rebalance Metadata: Remember the PrevTasks

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its plugable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.


StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


And on assignment: 

KafkaConsumer:

Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   AssignmentInfo          => Bytes

------------------

StreamsPartitionAssignor:

AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32


Streams Sticky TaskAssignor: Stickiness over Balance

Streams' StickyTaskAssignor will honor stickiness over workload balance. More specifically:

  1. First we calculate the average num.tasks each host should get on average as its "capacity", by dividing the total number of num.tasks to the total number of consumers (i.e. num.threads) and then multiple by the number of consumers that host has.
  2. Then for each task:
    1. If it has a client who owns it as its PrevTask, and that client still have capacity assign to it;
    2. Otherwise if it has a client who owns it as its StandbyTask, and that client still have capacity assign to it;
  3. If there are still unassigned tasks after step 2), then we loop over them at the per-sub-topology granularity (for workload balance), and again for each task:
    1. Find the client with the least load, and if there are multiple ones, prefer the one previously owns it, over the one previously owns it as standbyTask, over the one who does not own it at all.

As one can see, we honor stickiness (step 2) over workload balance (step 3).


Proposed Changes

Streams Two-Phase Task Assignor

Now the second part of this KIP is on Streams' PartitionAssginor implementation on top of the consumer rebalance protocol. Remember the difference between eager and (new) cooperative consumer rebalance protocol is that: in the "eager" mode, we always revoke everything before joining the group, in "cooperative" mode, we always revoke nothing before joining the group, but we may revoke some partitions after joining the group as indicated by the leader. Native consumer assignor would immediately let consumer members to revoke the partition immediately based on the Intersection(total-partitions, assigned-partitions).

In Streams however, we may want to defer the revocation as well if the intended new owner of the partition is "not ready", i.e. if the stateful task's restoration time (hence the unavailability gap) when migrating it to this new owner is long, since it does not have previously restored state for this task and hence need to restore from scratch. More generally speaking, we can extend this term to those hosts who may even have some local stores for the immigrating task, but is far behind the actual state's latest snapshot, and hence would still need to restore for a long time.

Streams SubscriptionInfo Update

The idea to resolve this, is to "delay" the revocation from the current owner to let the new owner first trying to close the gap of state update progress, and then revoke from the old owner and reassign to the new owner. However this cannot be easily done with a fixed "scheduled delay" since it really depends on the progress of the state store restoration on the new owner. To do that we need to let consumers report their current standby-tasks' "progress" when joining the group (some correlated information can be found at KAFKA-4696). More specifically, assuming that we've already done  Unable to render Jira issues macro, execution error.  which will refactor the existing assignmentInfo format to the following to reduce the message size:

AssignmentInfo (encoded in version 5) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => List<TaskId>
   PartitionsByHost        => Map<HostInfo, Set<TaskId>>
   ErrorCode               => Int32

We can refactor the subscriptionInfo format as well to encode the "progress" factor:

SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Map<TaskId, Int32>    // new change
   EndPoint                => HostInfo


More specifically, we will associate each standby task with an int32 value indicating its gap to the current active task's state snapshot. This gap is represented as the Sum(Diff(log_end_offset, restored_offset))_of_all_task_stores.

Also we will not distinguish between previous-active-tasks and previous-standby-tasks, since prev-active-tasks are just a special type of prev-tasks whose gap is zero. For tasks that are not in the prev-tasks list, it is indicating "I do not have this task's state at all, and hence the gap is simply the whole log".

For stateless tasks, there's no state in it and we will use a sentinel value (-1) to indicate its a stateless task in the prevTasks map. And only the host of the active task would include that in the prev-tasks map.

In addition, when Streams app is starting up, before joining the group it will also query the log-end-offset for all the local state stores in his state directory to calculate the gap; and after that the streams app can just maintain the gap dynamically for all its standby tasks (again, active tasks gap is just 0).


StreamsPartitionAssignor Logic Update

And then we will modify our sticky assignor logic. There are two things to keep in mind: 1) there's no semantic difference between prev-active and prev-standby stateful tasks any more, and 2) the assignor should be aware which tasks are stateful and which tasks are stateless, which can be easily inferred from its embedded topology builder. The goal is to assign the set of stateless and stateful tasks independently, trying to achieve workload balance while honoring stickiness (here the term "stickiness" would be interpreted based on the gap-value alone). And for stateless tasks, the assignor would not assign any standby tasks as well (KAFKA-4696).


More specifically:

  1. For the set of stateless tasks:
    1. First calculate the average number of tasks each thread should get on average.
    2. For each task (sorted by topic-groupId), if there is an owner of this task from prevTask (no more than one client should be claiming to own it as the owner) who's not exceeding the average number, assign to it;
    3. Otherwise, find the host with the largest remaining capacity (defined as the diff between the average number and the number of current assigned tasks) and assign to it.
  2. For the set of stateful tasks, first consider the active assignment:
    1. First calculate the average number of active-tasks each thread should get on average (so yes, we are still treating all the stateful tasks equally, and no we are not going to resolve KAFKA-4969 in this KIP).
    2. For each task (sorted by topic-groupId):
      1. Find the host with the smallest gap, if its not exceeding the average number, assign to it;
      2. Otherwise, if there's no hosts who has it before, there is nothing we can do but bite the bullet of restoration-gap, and we can just pick the client with largest remaining capacity and assign to it;
      3. Otherwise, it means that we have at least one prev-task owner but just the one with smallest gap already exceeded its capacity. We need to make a call here on the trade-off of workload imbalance v.s. restoration gap (some heuristics applicable in the first version)
        1. If we favor reducing restoration latency, we will still assign the task to the host with smallest gap, but if the standby task number N (used below in step 3) == 0, we'd force assign a standby task to the new owner candidate – otherwise we do nothing but just rely on step 3) to get us some standby tasks.
        2. Otherwise, we will assign the task to other host following the same logic of 2.b.i) above, but starting with the second smallest gap.
  3. Then we consider the standby assignment for stateful tasks (assuming num.replicas = N)
    1. First calculate the average number of standby tasks each thread should get on average.
    2. For each task(sorted by topic-groupId), ranging i from 1 to N:
      1. Find the i-th host with the smallest gap excluding the active owner and 1..(i-1)th standby owners, if its not exceeding the average number, assign to it;
      2. Otherwise, go to the next one with the smallest gap, and go back go 3.b.i) above, until we found no hosts left who has it before, we can just pick the client with largest remaining capacity and assign to it.
      3. If we run out of hosts before i == N it means we have assigned a standby task to each host, i.e. N > num.hosts, we will throw exception and fail.
  4. Note since the tasks are all sorted on topic-groupId, e.g. 1-1, 1-2, 1-3, ... 2-3 we are effectively trying to get per-sub-topology workload balance already. Also in the tie-breakers of step 1.c, 2.b.ii), and 2.b.ii) above, we will define it as the one who has the smallest number of tasks assigned to it from the same topic-groupId to further achieve per-sub-topology workload balance in a best effort.
  5. And whenever we've decided to favor reducing restoration latency in 2.b.iii.1) step above, we have introduced workload imbalance, and we'd want to get out of this state, by re-trigger a rebalance later so that the assignor can check if some standby owner can now take over the task. To do that, we will add a new type of error code named "imbalanced-assignment" in the ErrorCode field if the assignmentInfo, and when 2.b.iii.1) happens we will set this error code to all the members who own a standby task for the one triggered 2.b.iii.1) – there must be at least one of them. And upon receiving this error code, the thread will keep track of the progress of all its owned standby tasks, and then trigger another rebalance when the gap on all of them are close to zero.


NOTE the step 5) above indeed lost the specific information that which task should be on "watching-list", and hence the thread just need to watch all its standby tasks. We can, of course, inject new fields into the AssignmentInfo encoding to explicitly add those "watch-list" standby tasks. Personally I'm a bit reluctant to add them since they seem to be too specific and will make the streams assignor protocol not generalizable enough, but I can be convinced if there's strong motivations for the latter approach.

Please also compare this idea with the original algorithm below in "Assignment Algorithm" and let me know your thoughts.

Related Work

Note that the main concern of this KIP is how to allocate and re-allocate sharded stateful tasks, of which the state itself is the difficult part. Thus, although other stream processing systems are of prime interest, we can also look to the balancing algorithms employed by distributed databases, as long as those dbs follow the Primary/Replica model. This is advantageous both for the diversity of perspective it lends, but also because some of these database systems are more mature than any modern Stream Processing system.

One thing to note when considering other SP and DB systems is that, unlike most of them, Kafka Streams achieves durability via changelog topics. That is, in Streams, the purpose of a replica is purely  a hot standby, and it's perfectly safe to run with no replicas at all. In contrast, most other systems use the replicas for durability, so they potentially need extra machinery to ensure that at all times a certain number of replicas is available, or active, or consistent.

As an example of the degrees of freedom that are unique to Streams, we would be perfectly safe to assign the active task to the most caught-up node and assign the standby task to an empty node and completely discard any other existing replicas. In any other distributed data system, this would result in a dangerous loss of durability.

Kafka Consumer StickyAssignor

The Consumer's StickyAssignor implementation is interesting. It has many of the same goals as Streams' assignor, although it only has to deal with one class of partitions. In contrast, Streams' assignor has to consider: (0) partitions that must be grouped together for tasks, (1) partitions for Stateful tasks, (2) partitions for Stateless tasks, and (3) partitions for Standby tasks. Nevertheless, we could consider generalizing the StickyAssignor algorithm for multiple classes of partitions, as well as the grouping constraint and the standby/active constraint.

The assignment algorithm begins by copying the prior assignment and then removing any assignments that have become invalid (consumer has left or partition no longer exists). Thus, we start with the valid sub-set of the prior assignment and a list of all the partitions that need to be assigned. Then we iterate over the unassigned partitions and assign each one to the consumer that can  host it and has that has the smallest current assignment. This is a greedy assignment that should produce an assignment that is as balanced as possible while maintaining all current assignments. Then, we enter the balancing phase.

The balancing phase is an iterative algorithm. In each pass, it attempts to move each partition to a better place, and it continues with more passes until no more improvements are possible, or until the cluster as a whole is fully balanced. (Due to the assignment constraints, full balance may not be possible).

When considering the best move for a partition, it first checks to see if that partition is currently hosted on a consumer that is unbalanced with respect to the prior host of that partition. In this case, it just moves the partition back to the prior host. This is essentially a short-circuit for the case where a partition has become "unstuck" and restoring stickiness could actually improve balance. If we get past that short-circuit, then we just propose to move the partition to the consumer that can host it and has the smallest current assignment.

As mentioned, we keep "shuffling" all partitions in this way until we get an optimal balance, given the constraints.

reference: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java

Cruise Control

Cruse Control is a LinkedIn project to automate some aspects of Kafka broker cluster administration. One thing it does is dynamically rebalance the partition assignment over the broker instances based on a large number of metrics it collects including CPU load, disk usage, etc. It structures the assignment optimization task as a Goal-Based Optimization problem. See https://en.wikipedia.org/wiki/Goal_programming for more information about this paradigm. Note that "goal programming" often implies that you represent the goals as a system of linear equations, and then solve the system to maximize some variables (aka Linear Programming), but that's not how Cruise Control is implemented. It just expresses the optimization problem as a system of goals and seeks a satisfactory (not optimal) solution by sequentially satisfying each goal.

The structure of the implementation is that you get a list of Goal implementations, each one corresponding to an optimization goal, like disk usage. The overall optimizer sorts the goals by priority, and then passes in the cluster assignment to the highest priority goal. That goal proposes modifications to the assignment (typically using an iterative algorithm similar to the StickyAssignor's). Once it's happy with the proposal, it returns. Then the optimizer passes in the new proposed assignment to the next goal, and so on. Each goal is responsible for ensuring that its proposals do not violate any of the prior, higher priority, goals. The API provides a hook that the goal can call during its optimization passes to ensure it doesn't violate the higher priority goals.

I don't think it would be very straightforward to turn Cruise Control into a general framework for allocating resources, because an awareness of the structure of the task is built in at every level (the optimization and the goals all know that they are dealing with Kafka brokers and partitions). But there are of course off-the-shelf optimization frameworks we could consider using if we want to go down a generalization path.

It would be straightforward to implement our allocation algorithm following a similar pattern, though. This might be a good choice if we want to add more optimization goals in the future. The main benefit of the goal-based orientation is that it scales naturally with adding more optimization goals (you just plug them in). As well, it's pretty easy to configure/reconfigure the optimizer to include or remove different goals.

source: https://github.com/linkedin/cruise-control

Redis Cluster

Redis is a high-query-performance database. The main use case for it is as a caching layer. Partly due to this fact, data durability isn't of tremendous importance, and the main operating mode is single-node. However, a clustered version was released in 2015. Reading the documentation, it sounds like the intent is more to provide a simple mechanism for transcending single-node mode than to provide a true distributed database with the kinds of guarantees one would expect (like consistency). Accordingly, the data distribution and primary/replica handling are quite simplistic. This is not meant to be disparaging. Redis Cluster is designed to serve specific use cases well at the expense of other use cases.

In Redis Cluster, keys are hashed into one of 16384 buckets (called slots). The hashing algorithm does not need to be consistent because the slots are fixed in number. Nodes in the cluster have unique and permanent IDs. Cluster integrity is maintained via a gossip protocol, and each node ultimately maintains a connection to every other node. Nodes don't proxy queries to non-local keys. Instead they respond with the hash slot that contains the key, along with the node that currently holds that hash slot (this is similar to Streams). Every node in the cluster is assigned some subset of the slots, and the slots can be manually assigned, unassigned, and moved to, from, and between nodes.

Redis Cluster doesn't have primary and replicas for each slot (the way that Streams has primary and standbys for each task). Rather, it has primary and replicas for each node . Thus, if there are two nodes, A and B with replicas Ar and Br, the node Ar is responsible for replicating the exact same set of slots that are assigned to A, and likewise with B and Br. This simplifies the assignment considerations somewhat, as a given node only  needs to host active (primary) copies or  passive (replica) copies. The replicas can serve queries if the query specifies that it accepts stale data, but most likely this arrangement results in the replica nodes being underutilized. So, my quick judgement is that the simplicity of this arrangement is enviable, but we probably don't want to follow suit.

reference: https://redis.io/topics/cluster-spec

Elasticsearch

An Elasticsearch cluster is a collection of nodes, an individual instance of Elasticsearch. At the cluster scope, there is a master node that is responsible for coordinating cluster changes such as adding or removing nodes, indices, etc. Each node has one or more shards, which correspond to a certain (Lucene) index that the cluster is fetching data from. An index is divided up into shards across one or more nodes, where the work for that index is distributed across the shards. Each shard has a primary shard responsible for writes, and one or more replica shards that can receive reads. 

The rough translations to Streams are shown in the table below. Note that the comparisons are drawn as relates to load balancing, rather than literal definition (for example, an index is really more like a store. However for our purposes it is more useful to think of it as an entire subtopology, in that each index/subtopology is an independent job, that has some inherent "weight" – such as the number of stores for a subtopology – and its work is partitioned and distributed independently, into some number of shards/tasks. The analogies are fairly close, and Elasticsearch has to solve a load balancing problem similar to the one that Streams faces – one main high level difference to point out is that the replica shards are presumed in sync with the active shards, removing the complexity of "restore completeness" from their challenge. 

Elasticsearch

Streams

indexsubtopology
master nodegroup leader
nodeinstance
primary shardactive task
replica shardstandby task
shardtask

Elasticsearch actually breaks down the problem into two separate processes: allocation and rebalancing. Allocation refers to the assignment of (unallocated) shards to nodes, while rebalancing occurs separately and involves moving allocated shards around. By default, rebalancing can only occur when all shards are allocated (can be configured to be allowed only when active shards, or always). Multiple rebalances can take place concurrently, up to some configurable max (defaults to 2) – note that this limit applies only to "load balancing" rebalances and not those forced by environmental (user-defined) constraints. You can also dynamically disable/enable rebalancing either type of shard.

The allocation algorithm is as follows – note that this applies only to placement of unassigned shards, but nodes may have other shards already assigned to them.

  1. Group shards by index, then sort by shard ID to get the order of shard allocation. First all primary shards are allocated, then one replica for each shard of each index, and repeat if number of replicas is greater than one.
  2. For each shard, build a list of possible nodes. Nodes may be eliminated from consideration based on user config (eg allocation filtering) or various constraints (no copies of a shard should be allocated to the same node, adequate remaining disk space, forced awareness, max retries)
  3. If step 2 returns no nodes, the shard will be retried later (possibly after a rebalance). Otherwise, we calculate the weight of each node if given the shard, and allocate it to the one with the lowest weight. The weighting function depends on two settings: indexBalance (0.55 by default) and shardBalance (0.45 by default). The total weight is the weighted average of the shard and index weights, weighted by the fractional shard and index balance respectively. This is computed as 
private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
    final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
    final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
    return theta0 * weightShard + theta1 * weightIndex;
}

where 

theta0 = shardBalance / (indexBalance + shardBalance);
theta1 = indexBalance / (indexBalance + shardBalance);






Old Version of KIP-441

----------------------------------------------------------------------------------------------------------------------------

OLD VERSION OF THE KIP, YET TO BE CLEANED UP

Terminology


we shall define several terms for easy walkthrough of the algorithm.

  • Instance (A.K.A stream instance): the KStream instance serving as container of stream threads set. This could suggest a physical host or a k8s pod. The stream thread's capacity is essentially controlled by the instance relative size.
  • Learner task: a special standby task that gets assigned to one stream instance to restore a current active task and transits to active when the restoration is complete.

Learner Task Essential

Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do the task transfer. The goal of learner task is to delay the task migration when the destination host has not finished replaying the active task.

Next we are going to look at several typical scaling scenarios and edge scenarios to better understand the design of this algorithm.

Normal Scenarios

Scale Up Running Application

The newly joined stream threads will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real ownership transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.

Scale-up
Cluster has 3 stream threads S1(leader), S2, S3, and they each own some tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group.
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])

#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempting to join the group.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])

#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members each owning one task.


Scale Up from Empty Group

Scaling up from scratch means all stream threads are new members. There is no need to start a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.

After deprecating group.initial.rebalance.delay, we still expect the algorithm to work because every task assignment during rebalance will adhere to the rule "if given task is currently active, reassignment must happen only to stream threads who are declared ready to serve this task."

Scale-up from ground
Group empty state: unassigned tasks [T1, T2, T3, T4, T5]

#First Rebalance 
New member S1 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) // T1~5 not previously owned

#Second Rebalance 
New member S2, S3 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) 
S2(assigned: [], revoked: [], learning: [T3, T4])
S3(assigned: [], revoked: [], learning: [T5])

#Third Rebalance 
S2 and S3 are ready immediately after the assignment.
Member S1~S3 join with following status:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [], revoked: [], learning: [T3, T4])
	S3(assigned: [], revoked: [], learning: [T5])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])

Scale Down Running Application

When performing the scale down of stream group, it is also favorable to initiate learner tasks before actually shutting down the instances. Although standby tasks could help in this case, it requires user to pre-set num.standby.tasks which may not be available when administrator performs scaling down. Besides the standby tasks are not guaranteed up-to-date. The plan is to use command line tool to tell certain stream members that a shutdown is on the way to be executed. These informed members will send join group request to indicate that they are “leaving soon”. During assignment phase, leader will perform the learner assignment among members who are not leaving. And the leaving member will shut down itself once received the instruction to revoke all its active tasks.

For ease of operation, a new tool for scaling down the stream app shall be built. It will have access to the application instances, and ideally could do two types of scaling down:

  1. Percentage scaling. Compute targeting scaled down members while end user just needs to provide a %. For example, if the current cluster size is 40 and we choose to scale down to 80%, then the script will attempt to inform 8 of 40 hosts to “prepare leaving” the group.
  2. Name-based scaling. Name the stream instances that we want to shut down soon. This is built for online hot swapping and host replacement.

Scale-down stream applications
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
Scaling down the application, S2 will be leaving.

#First Rebalance 
Member S2 joins the group and claims that it is leaving.
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [T4])

#Second Rebalance 
S3 finishes replay first and trigger another rebalance
Member S1 ~ S3 join with following status:(S1 is not ready yet)
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [], revoked: [T3, T4], learning: []) 
	S3(assigned: [T5], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

#Third Rebalance 
S1 finishes replay and trigger rebalance.
Member S1~S3 join with following status: 
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [], revoked: [T3], learning: []) 
	S3(assigned: [T4, T5], revoked: [], learning: [])
S1 performs task assignments:
	S1(assigned: [T1, T2, T3], revoked: [], learning: [])
	S2(assigned: [], revoked: [T3], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])
S2 will shutdown itself upon new assignment since there is no assigned task left.

Online Host Swapping (Scaling Up Then Down)

This is a typical use case where user wants to replace entire application's host type. Normally administrator will choose to do host swap one by one, which could cause endless KStream resource shuffling. The recommended approach under cooperative rebalancing is like:

  • Increase the capacity of the current stream job to 2X and boost up new type instances.
  • Mark existing stream instances as leaving.
  • Learner tasks finished on new hosts, shutting down old ones.
Online Swapping
Group stable state: S1[T1, T2], S2[T3, T4]
Swapping application instances, adding S3, S4 with new instance type.

#First Rebalance 
Member S3, S4 join the group.
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: [T2])
	S4(assigned: [], revoked: [], learning: [T4])

Use scaling tool to indicate S1 & S2 are leaving.
#Second Rebalance 
Member S1, S2 initiate rebalance to indicate state change (leaving)
Member S1~S4 join with following status: 
	S1(assigned: [T1], revoked: [T2], learning: [])
	S2(assigned: [T3], revoked: [T4], learning: []) 
	S3(assigned: [], revoked: [], learning: [T2])
	S4(assigned: [], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: [T1, T2])
	S4(assigned: [], revoked: [], learning: [T3, T4])

#Third Rebalance 
S3 and S4 finishes replay T1 ~ T4 trigger rebalance.
Member S1~S4 join with following status: 
	S1(assigned: [], revoked: [T1, T2], learning: [])
	S2(assigned: [], revoked: [T3, T4], learning: [])
	S3(assigned: [], revoked: [], learning: [T1, T2])
	S4(assigned: [], revoked: [], learning: [T3, T4])
S1 performs task assignments:
	S1(assigned: [], revoked: [], learning: [])
	S2(assigned: [], revoked: [], learning: [])
	S3(assigned: [T1, T2], revoked: [], learning: [])
	S4(assigned: [T3, T4], revoked: [], learning: [])
S1~S2 will shutdown themselves upon new assignment since there is no assigned task left.

Edge Scenarios

Backing Up Information On Leader 

Since the incremental rebalancing requires certain historical information of last round assignment, the leader stream thread will need to maintain the knowledge of:

  1. Who participated in the last round of rebalance. This is required information to track new comers.
  2. Who will be leaving the consumer group. This is for scaling down support as the replay could take longer time than the scaling down timeout. Under static membership, since we don't send leave group information, we could leverage leader to explicitly trigger rebalance when the scale-down timeout reaches. Maintaining set of leaving members are critical in making the right task shuffle judgement.

These are essential group state knowledges leader wants to memorize. To avoid the severity of leader crash during scaling, we are avoiding backing up too much information on leader for now. The following edge cases are around leader incident during scaling.

Leader Transfer During Scaling 

Leader crash could cause a missing of historical assignment information. For the learners already assigned, however, each stream thread maintains its own assignment status, so when the learner task's id has no corresponding active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. 

Leader crash during scaling
Cluster has 3 stream stream threads S1(leader), S2, S3, and they own tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.

Leader Transfer Before Scaling 

However, if the leader dies before new instances join, the potential risk is that leader could not differentiate which stream instance is "new", because it relies on the historical information. For version 1.0, final assignment is probably not ideal in this case if we only attempt to assign learner task to new comers. This also motivates us to figure out a better task coordination strategy for load balance in long term.

Leader crash before
Cluster has 3 stream threads S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]

#First Rebalance 
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader, while T1 is not assigned now
S2 ~ S4 join with following status
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments: (no learner assignment since S2 doesn't know S4 is new member)
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [T1], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [])

Now the group reaches balance, although the eventual load is skewed.

Assignment Algorithm

The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". Next, we will present a holistic view of the new learner assignment algorithm during each actual rebalance.

The assignment will be broken down in the order of: active, learner and standby tasks.

Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of Stream Threads,

      Where each stream thread contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicate "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To instances who are not marked "leaving"	
		To resource available instances

	Keep existing learner tasks' assignment unchanged

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances who are not marked "leaving"	
		To instances with corresponding standby tasks
	Prerequisite is that the instance version supports learner mechanism. 

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource abundant instances
		To instances who are not marked "leaving"
	Based on num.standby.task config, standby task assignment could take multiple rounds

Output Finalized Task Assignment

Stream Task Tagging

To enable learner resource shuffling behavior, we need to have the following task status indicators to be provided:

Tag NameTask TypeExplanation
isStatefulbothIndicate whether given task has a state to restore.
isLearnerstandbyIndicate whether standby task is a learner task.
beingLearnedactiveIndicate whether active task is being learned by some other stream thread.
isReadystandbyIndicate whether standby task is ready to serve as active task.

Optimizations

Stateful vs Stateless Tasks

For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task, so that when we eventually consider the load balance of the stream applications, this could help us separate out tasks into two buckets and rebalance independently.

Eager Rebalance 

Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream thread could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency by introducing many more rebalances. We could supply user with a config to decide whether they want to take eager approach or stable approach eventually, with some follow-up benchmark tools of the rebalance efficiency. Example:

A stream thread S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the stream thread will call out rebalance immediately when T1 finishes replaying. While under conservative approach, stream thread will rejoin the group until it finishes replaying both T1 and T2.

Standby Task Utilization

Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize stream threads which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly. 

Scale Down Timeout

User naturally wants to reach a sweet spot between ongoing task transfer and streaming resource free-up. So we want to take a similar approach as KIP-415, where we shall introduce a client config to make sure the scale down is time-bounded. If the time takes to migrate tasks outperforms this config, the leader will send out join group request and force removing active tasks on the leaving members and transfer those tasks to other staying members, so that leaving members will shut down themselves immediately after this round of rebalance.

Trade-offs

More Rebalances vs Global Efficiency

The new algorithm will invoke many more rebalances than the current protocol as one could perceive. As we have discussed in the overall incremental rebalancing design, it is not always bad to have multiple rebalances when we do it wisely, and after KIP-345 we have a future proposal to avoid scale up rebalances for static members. The goal is to pre-register the members that are planning to be added. The broker coordinator will augment the member list and wait for all the new members to join the group before rebalancing, since by default stream application’s rebalance timeout is infinity. The conclusion is that: it is server’s responsibility to avoid excessive rebalance, and client’s responsibility to make each rebalance more efficient.

Metadata Space vs Allocation Efficiency

Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we add-on too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.

Iteration Plan

For the smooth delivery of all the features discussed so far, the iteration is divided into four stages:

Version 1.0

Delivery goal: Scale up support, conservative rebalance

The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader stream thread will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to new instances onceThe reason for only implementing new instances logic is because there is a potential edge case that could break current naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. We plan to better address this issue in version 4.0 where we take eventual load balance into consideration. Some discussions have been initiated on marking task weight for a while. To me, it is unclear so far what kind of eventual balance model we are going to implement at current stage. In conclusion, we want to postpone the finalized design for eventual balance until last version.

Version 2.0

Delivery goal: Scale down support

We will focus on the delivery of scaling down support upon the success of version 1.0. We need to extend on the v1 protocol since we need existing instances to take the extra learning load. We shall break the statement in v1 which claims that "only new instances could take learner tasks". To make this happen, we need to deliver in following steps:

  1. Create new tooling for marking instances as ready to scale down.
  2. Tag the leaving information for targeted members.
  3. Scale down timeout support.

Version 3.0

Delivery goal: Eager rebalance

A detailed analysis and benchmark test need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely unfavorable. 

The version 3.0 is upon version 1.0 success, and could be done concurrently with version 2.0. We may choose to adopt or discard this change, depending on the benchmark result.

Version 4.0 (Stretch)

Delivery goal: Task state labeling, eventual workload balance

Question here: we could deviate a bit from designing the ultimate goal, instead providing user a handy tool to do that.

The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks (according to relative instance capacity), which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave. 

Eventual balance example
A group with 4 instances that have following capacities: 5, 10, 10, 15, and 80 tasks shall expect the perfect balances into:

10(5/40), 20(10/40), 20(10/40), 30(15/40) tasks.

In case we set imbalance factor to 20%
then an eventual assignment like
12, 18, 23, 27 should be stable, as all of them are not way-off the expected load assignment.


Some optimizations such as balancing the load separately for stateful tasks and stateless tasks could also be applied here. So far version 4.0 still has many unknowns and is slightly beyond the incremental rebalancing scope. Our plan is to keep iterating on the details or bake a separate KIP for balancing algorithm in the future.

Public Interfaces

We are going to add a new type of protocol called "stream" for the protocol type. 

Protocol Type
ProtocolTypes : {"consumer","connect","stream"}


Also adding new configs for user to better apply and customize the scaling change.

stream.rebalancing.mode

Default: incremental

Version 1.0

The setting to help ensure no downtime upgrade of online application.

Options : upgrading, incremental

scale.down.timeout.ms

Default: infinity

Version 2.0

Time in milliseconds to force terminate the stream thread when informed to be scaled down.

learner.partial.rebalance

Default : true

Version 3.0

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the replaying. Otherwise, new stream thread will batch the ready call to ask for a single round of rebalance.

stream.imbalance.percentage

Default: 0.2 (20%)

Version 4.0

The tolerance of task imbalance factor between hosts to trigger rebalance.

Implementation Plan

To make sure the delivery is smooth with fundamental changes of KStream internals, we build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the algorithm requirements are highly coupled with internal architecture reasoning.

Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Recommended Upgrade Procedure

As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is like:

  • Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream application to stay with protocol type "consumer".
  • Rolling restart the stream application and the change is automatically applied. This is safe because we are not changing protocol type.

In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far, user could choose to take this much easier workaround.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the doc in implementation plan.



  • No labels