...
Table of Contents |
---|
Status
Current state: [Under Discussion] Accepted
Vote thread: here
Discussion thread: TBD here
JIRA:
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
...
Motivation
This KIP is following on KIP-429 to improve Streams scaling out behavior. While KIP-429 is focused on decreasing the amount of time that progress is blocked on the rebalance itself, this KIP addresses a second phase of stoppage that happens today: upon receiving assignment of a stateful task, Streams has to catch it up to the head of its changelog before beginning to process it.
...
If you want to know more about details on protocol level, feel free to checkout KIP-429.
Constraints and Cost Function
...
Note, this is a cost minimization problem, since it may not be possible to fully satisfy all three components of the cost function.
Streams Rebalance Metadata: Remember the PrevTasks
Today Streams embed a full fledged Consumer
client, which hard-code a ConsumerCoordinator
inside. Streams then injects a StreamsPartitionAssignor
to its plugable PartitionAssignor
interface and inside the StreamsPartitionAssignor
we also have a TaskAssignor
interface whose default implementation is StickyPartitionAssignor
. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:
Code Block |
---|
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor. |
StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:
Code Block |
---|
KafkaConsumer:
Subscription => TopicList SubscriptionInfo
TopicList => List<String>
SubscriptionInfo => Bytes
------------------
StreamsPartitionAssignor:
SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
VersionId => Int32
LatestSupportVersionId => Int32
ClientUUID => 128bit
PrevTasks => Set<TaskId>
StandbyTasks => Set<TaskId>
EndPoint => HostInfo |
And on assignment:
Code Block |
---|
KafkaConsumer:
Assignment = AssignedPartitions AssignmentInfo
AssignedPartitions => List<TopicPartition>
AssignmentInfo => Bytes
------------------
StreamsPartitionAssignor:
AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
VersionId => Int32
LatestSupportVersionId => Int32
ActiveTasks => List<TaskId>
StandbyTasks => Map<TaskId, Set<TopicPartition>>
PartitionsByHost => Map<HostInfo, Set<TopicPartition>>
ErrorCode => Int32 |
Streams Sticky TaskAssignor: Stickiness over Balance
Streams' StickyTaskAssignor will honor stickiness over workload balance. More specifically:
- First we calculate the average num.tasks each host should get on average as its "capacity", by dividing the total number of num.tasks to the total number of consumers (i.e. num.threads) and then multiple by the number of consumers that host has.
- Then for each task:
- If it has a client who owns it as its PrevTask, and that client still have capacity assign to it;
- Otherwise if it has a client who owns it as its StandbyTask, and that client still have capacity assign to it;
- If there are still unassigned tasks after step 2), then we loop over them at the per-sub-topology granularity (for workload balance), and again for each task:
- Find the client with the least load, and if there are multiple ones, prefer the one previously owns it, over the one previously owns it as standbyTask, over the one who does not own it at all.
As one can see, we honor stickiness (step 2) over workload balance (step 3).
Proposed Changes
Streams Two-Phase Task Assignor
Now the second part of this KIP is on Streams' PartitionAssginor implementation on top of the consumer rebalance protocol. Remember the difference between eager and (new) cooperative consumer rebalance protocol is that: in the "eager" mode, we always revoke everything before joining the group, in "cooperative" mode, we always revoke nothing before joining the group, but we may revoke some partitions after joining the group as indicated by the leader. Native consumer assignor would immediately let consumer members to revoke the partition immediately based on the Intersection(total-partitions, assigned-partitions).
In Streams however, we may want to defer the revocation as well if the intended new owner of the partition is "not ready", i.e. if the stateful task's restoration time (hence the unavailability gap) when migrating it to this new owner is long, since it does not have previously restored state for this task and hence need to restore from scratch. More generally speaking, we can extend this term to those hosts who may even have some local stores for the immigrating task, but is far behind the actual state's latest snapshot, and hence would still need to restore for a long time.
Streams SubscriptionInfo Update
The idea to resolve this, is to "delay" the revocation from the current owner to let the new owner first trying to close the gap of state update progress, and then revoke from the old owner and reassign to the new owner. However this cannot be easily done with a fixed "scheduled delay" since it really depends on the progress of the state store restoration on the new owner. To do that we need to let consumers report their current standby-tasks' "progress" when joining the group (some correlated information can be found at KAFKA-4696). More specifically, assuming that we've already done
Jira | ||||||
---|---|---|---|---|---|---|
|
Code Block |
---|
AssignmentInfo (encoded in version 5) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
VersionId => Int32
LatestSupportVersionId => Int32
ActiveTasks => List<TaskId>
StandbyTasks => List<TaskId>
PartitionsByHost => Map<HostInfo, Set<TaskId>>
ErrorCode => Int32 |
We can refactor the subscriptionInfo format as well to encode the "progress" factor:
Code Block |
---|
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
VersionId => Int32
LatestSupportVersionId => Int32
ClientUUID => 128bit
PrevTasks => Map<TaskId, Int32> // new change
EndPoint => HostInfo |
More specifically, we will associate each standby task with an int32 value indicating its gap to the current active task's state snapshot. This gap is represented as the Sum(Diff(log_end_offset, restored_offset))_of_all_task_stores.
Also we will not distinguish between previous-active-tasks and previous-standby-tasks, since prev-active-tasks are just a special type of prev-tasks whose gap is zero. For tasks that are not in the prev-tasks list, it is indicating "I do not have this task's state at all, and hence the gap is simply the whole log".
For stateless tasks, there's no state in it and we will use a sentinel value (-1) to indicate its a stateless task in the prevTasks map. And only the host of the active task would include that in the prev-tasks map.
In addition, when Streams app is starting up, before joining the group it will also query the log-end-offset for all the local state stores in his state directory to calculate the gap; and after that the streams app can just maintain the gap dynamically for all its standby tasks (again, active tasks gap is just 0).
StreamsPartitionAssignor Logic Update
And then we will modify our sticky assignor logic. There are two things to keep in mind: 1) there's no semantic difference between prev-active and prev-standby stateful tasks any more, and 2) the assignor should be aware which tasks are stateful and which tasks are stateless, which can be easily inferred from its embedded topology builder. The goal is to assign the set of stateless and stateful tasks independently, trying to achieve workload balance while honoring stickiness (here the term "stickiness" would be interpreted based on the gap-value alone). And for stateless tasks, the assignor would not assign any standby tasks as well (KAFKA-4696).
More specifically:
- For the set of stateless tasks:
- First calculate the average number of tasks each thread should get on average.
- For each task (sorted by topic-groupId), if there is an owner of this task from prevTask (no more than one client should be claiming to own it as the owner) who's not exceeding the average number, assign to it;
- Otherwise, find the host with the largest remaining capacity (defined as the diff between the average number and the number of current assigned tasks) and assign to it.
- For the set of stateful tasks, first consider the active assignment:
- First calculate the average number of active-tasks each thread should get on average (so yes, we are still treating all the stateful tasks equally, and no we are not going to resolve KAFKA-4969 in this KIP).
- For each task (sorted by topic-groupId):
- Find the host with the smallest gap, if its not exceeding the average number, assign to it;
- Otherwise, if there's no hosts who has it before, there is nothing we can do but bite the bullet of restoration-gap, and we can just pick the client with largest remaining capacity and assign to it;
- Otherwise, it means that we have at least one prev-task owner but just the one with smallest gap already exceeded its capacity. We need to make a call here on the trade-off of workload imbalance v.s. restoration gap (some heuristics applicable in the first version)
- If we favor reducing restoration latency, we will still assign the task to the host with smallest gap, but if the standby task number N (used below in step 3) == 0, we'd force assign a standby task to the new owner candidate – otherwise we do nothing but just rely on step 3) to get us some standby tasks.
- Otherwise, we will assign the task to other host following the same logic of 2.b.i) above, but starting with the second smallest gap.
- Then we consider the standby assignment for stateful tasks (assuming num.replicas = N)
- First calculate the average number of standby tasks each thread should get on average.
- For each task(sorted by topic-groupId), ranging i from 1 to N:
- Find the i-th host with the smallest gap excluding the active owner and 1..(i-1)th standby owners, if its not exceeding the average number, assign to it;
- Otherwise, go to the next one with the smallest gap, and go back go 3.b.i) above, until we found no hosts left who has it before, we can just pick the client with largest remaining capacity and assign to it.
- If we run out of hosts before i == N it means we have assigned a standby task to each host, i.e. N > num.hosts, we will throw exception and fail.
- Note since the tasks are all sorted on topic-groupId, e.g. 1-1, 1-2, 1-3, ... 2-3 we are effectively trying to get per-sub-topology workload balance already. Also in the tie-breakers of step 1.c, 2.b.ii), and 2.b.ii) above, we will define it as the one who has the smallest number of tasks assigned to it from the same topic-groupId to further achieve per-sub-topology workload balance in a best effort.
- And whenever we've decided to favor reducing restoration latency in 2.b.iii.1) step above, we have introduced workload imbalance, and we'd want to get out of this state, by re-trigger a rebalance later so that the assignor can check if some standby owner can now take over the task. To do that, we will add a new type of error code named "imbalanced-assignment" in the ErrorCode field if the assignmentInfo, and when 2.b.iii.1) happens we will set this error code to all the members who own a standby task for the one triggered 2.b.iii.1) – there must be at least one of them. And upon receiving this error code, the thread will keep track of the progress of all its owned standby tasks, and then trigger another rebalance when the gap on all of them are close to zero.
NOTE the step 5) above indeed lost the specific information that which task should be on "watching-list", and hence the thread just need to watch all its standby tasks. We can, of course, inject new fields into the AssignmentInfo encoding to explicitly add those "watch-list" standby tasks. Personally I'm a bit reluctant to add them since they seem to be too specific and will make the streams assignor protocol not generalizable enough, but I can be convinced if there's strong motivations for the latter approach.
Please also compare this idea with the original algorithm below in "Assignment Algorithm" and let me know your thoughts.
Related Work
Note that the main concern of this KIP is how to allocate and re-allocate sharded stateful tasks, of which the state itself is the difficult part. Thus, although other stream processing systems are of prime interest, we can also look to the balancing algorithms employed by distributed databases, as long as those dbs follow the Primary/Replica model. This is advantageous both for the diversity of perspective it lends, but also because some of these database systems are more mature than any modern Stream Processing system.
One thing to note when considering other SP and DB systems is that, unlike most of them, Kafka Streams achieves durability via changelog topics. That is, in Streams, the purpose of a replica is purely a hot standby, and it's perfectly safe to run with no replicas at all. In contrast, most other systems use the replicas for durability, so they potentially need extra machinery to ensure that at all times a certain number of replicas is available, or active, or consistent.
As an example of the degrees of freedom that are unique to Streams, we would be perfectly safe to assign the active task to the most caught-up node and assign the standby task to an empty node and completely discard any other existing replicas. In any other distributed data system, this would result in a dangerous loss of durability.
Redis Cluster
Redis is a high-query-performance database. The main use case for it is as a caching layer. Partly due to this fact, data durability isn't of tremendous importance, and the main operating mode is single-node. However, a clustered version was released in 2015. Reading the documentation, it sounds like the intent is more to provide a simple mechanism for transcending simple-node mode than to provide a true distributed database with the kinds of guarantees one would expect (like consistency). Accordingly, the data distribution and primary/replica handling are quite simplistic.
...
Old Version of KIP-441
----------------------------------------------------------------------------------------------------------------------------
OLD VERSION OF THE KIP, YET TO BE CLEANED UP
Terminology
we shall define several terms for easy walkthrough of the algorithm.
- Instance (A.K.A stream instance): the KStream instance serving as container of stream threads set. This could suggest a physical host or a k8s pod. The stream thread's capacity is essentially controlled by the instance relative size.
- Learner task: a special standby task that gets assigned to one stream instance to restore a current active task and transits to active when the restoration is complete.
Learner Task Essential
Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do the task transfer. The goal of learner task is to delay the task migration when the destination host has not finished replaying the active task.
Next we are going to look at several typical scaling scenarios and edge scenarios to better understand the design of this algorithm.
Normal Scenarios
Scale Up Running Application
The newly joined stream threads will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real ownership transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.
Code Block | ||||
---|---|---|---|---|
| ||||
Cluster has 3 stream threads S1(leader), S2, S3, and they each own some tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
#First Rebalance
New member S4 joins the group.
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [], learning: [])
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [T1])
#Second Rebalance
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [T1])
S5(assigned: [], revoked: [], learning: [])
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [], learning: [])
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [T1])
S5(assigned: [], revoked: [], learning: [T3])
#Third Rebalance
Member S4 finishes its replay and becomes ready, re-attempting to join the group.
Member S1~S5 join with following status:(S5 is not ready yet)
S1(assigned: [T2], revoked: [T1], learning: [])
S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [T1])
S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
S1(assigned: [T2], revoked: [T1], learning: [])
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [T1], revoked: [], learning: [])
S5(assigned: [], revoked: [], learning: [T3])
#Fourth Rebalance
Member S5 is ready, re-attempt to join the group.
Member S1~S5 join with following status:(S5 is not ready yet)
S1(assigned: [T2], revoked: [], learning: [])
S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [T1], revoked: [], learning: [])
S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
S1(assigned: [T2], revoked: [], learning: [])
S2(assigned: [T4], revoked: [T3], learning: [])
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [T1], revoked: [], learning: [])
S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members each owning one task. |
...
Scaling up from scratch means all stream threads are new members. There is no need to start a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.
After deprecating group.initial.rebalance.delay, we still expect the algorithm to work because every task assignment during rebalance will adhere to the rule "if given task is currently active, reassignment must happen only to stream threads who are declared ready to serve this task."
Code Block | ||||
---|---|---|---|---|
| ||||
Group empty state: unassigned tasks [T1, T2, T3, T4, T5]
#First Rebalance
New member S1 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) // T1~5 not previously owned
#Second Rebalance
New member S2, S3 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: [])
S2(assigned: [], revoked: [], learning: [T3, T4])
S3(assigned: [], revoked: [], learning: [T5])
#Third Rebalance
S2 and S3 are ready immediately after the assignment.
Member S1~S3 join with following status:
S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: [])
S2(assigned: [], revoked: [], learning: [T3, T4])
S3(assigned: [], revoked: [], learning: [T5])
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: [])
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5], revoked: [], learning: [])
|
Scale Down Running Application
When performing the scale down of stream group, it is also favorable to initiate learner tasks before actually shutting down the instances. Although standby tasks could help in this case, it requires user to pre-set num.standby.tasks which may not be available when administrator performs scaling down. Besides the standby tasks are not guaranteed up-to-date. The plan is to use command line tool to tell certain stream members that a shutdown is on the way to be executed. These informed members will send join group request to indicate that they are “leaving soon”. During assignment phase, leader will perform the learner assignment among members who are not leaving. And the leaving member will shut down itself once received the instruction to revoke all its active tasks.
For ease of operation, a new tool for scaling down the stream app shall be built. It will have access to the application instances, and ideally could do two types of scaling down:
...
Proposed Assignment/Rebalance Algorithm
At a high level, we are proposing an iterative algorithm that prioritizes recovery time, while planning task movements to improve balance over consecutive rebalances. This proposal builds on KIP-429, so we assume the overhead of rebalancing is much less than it is today.
In each rebalance, the leader assigns active tasks only to instances that are ready to begin processing them immediately. If there is no such instance, then it assigns the active task to the instance that is closest to catching up.
The task movements are accomplished by assigning the task as a standby task to the destination instance, and then rebalancing once those tasks are up to date. The latter rebalance would find that a more balanced solution is available while still only assigning work to caught-up instances.
This means we need some mechanism to trigger a rebalance outside of group or topic changes (which are the only triggers currently). There are a number of ways to do this, but we propose to add a notion of "probing rebalance", discussed in detail below
If the leader determines that the current assignment is optimally balanced already, then it does not need to trigger any probing rebalances.
A note on the "moving" tasks. These are very similar to standby tasks, but we propose to give them a different name to make their role clear. This way, it's clear that they don't count against the "num.standby.replicas" config, and also that they represent an ephemeral increase in cluster storage requirements.
Parameters
- acceptable_recovery_lag*: A scalar integer value indicating a task lag (number of offsets to catch up) that is acceptable for immediate assignment. Defaults to 10,000 (should be well under a minute and typically a few seconds, depending on workload). Must be at least 0.
- num_standbys: A scalar integer indicating the number of hot-standby task replicas to maintain in addition to the active processing tasks. Defaults to 0. Must be at least 0.
- probing_rebalance_interval_ms*: A time interval representing the minimum amount of time (in milliseconds) that the leader should wait before triggering a "probing rebalance", assuming there is no intervening natural rebalance. Default is 10 minutes (because there would be effectively no cost for no-op rebalances). Must be at least 1 minute.
- max_warmup_replicas*: A scalar integer representing the maximum number of extra replicas to assign for the purpose of moving a task to an instance that has neither the active nor any standby replicas of that task assigned. Used to throttle how much extra broker traffic and cluster state would be used for moving tasks. Defaults to 2. Minimum is 1.
* new config
Probing Rebalances
As of this KIP, an assignment may result in an unbalanced distribution of tasks in favor of returning to processing as soon as possible, while under-loaded nodes are assigned standby tasks to warm up. Once the under-loaded instances are warm, a next rebalance would be able to produce a more balanced assignment. This proposal creates a new requirement: the ability to trigger rebalances in response to conditions other than cluster or topic changes.
Ideally, group members would be able to communicate their current status back to the group leader so that the leader could trigger rebalances when the members catch up, but there is currently no such communication channel available. In lieu of that, we propose to have the leader trigger rebalances periodically, on a configured "probing_rebalance_interval". As each member re-joins the group, they would report their current lag (as proposed below), and the leader would potentially be able to compute a more balanced assignment.
Note that "balance" in this KIP, as today, is purely a function of task distribution over the instances of the cluster. Therefore, an assignment's balance can only change in response to changes in the cluster, or changes to the topic metadata (which could change the number of tasks). Both of these conditions already trigger a rebalance, so once the leader makes a balanced assignment, it can stop performing probing rebalances, confident that any later change to the balance of the cluster would trigger a rebalance automatically.
See the "Rejected Alternatives" section for a discussion of alternatives to probing rebalances.
Iterative Balancing Assignments
As mentioned in the overall description of the algorithm, the assignment algorithm creates as balanced an assignment as possible while strictly assigning stateful tasks only among the most-caught-up instances. It also plans task movements to gradually improve assignment balance over time, stopping when it is able to create a balanced assignment.
As described above, to move tasks to fresh instances while still respecting num.standby.replicas, the assignor will assign "extra" standby replicas to the destination instances. Once these extra replicas are caught up, a subsequent assignment will make that the active (or a permanent standby) and unassign one of the prior active or standby tasks.
A consequence of this design is that the overall amount of state in the cluster will exceed the sum of active and configured standby tasks while tasks are being moved. Also, each "moving" task reads from the broker, resulting in increased broker load and network traffic. To mitigate these effects, we'll introduce the max_task_migrations configuration, which limits the number of extra tasks that can be assigned at a time (inspired by Elasticsearch). The default is set to 2, but can be decreased to 1 or increased arbitrarily.
The default setting is likely to result in needing multiple balance improvements during probing rebalances over time, but the tradeoff is less load on the cluster and the broker. At the other end of the spectrum, setting the number high would allow the algorithm to assign all the desired movements in one shot, resulting in the whole process needing just two rebalances: the initial one to assign the moving tasks, and the final one to realize the movements and drop the old, unbalanced tasks.
Assignment Algorithm
The overall balancing algorithm relies on an assignment algorithm, which is not specified in this KIP, but is left as an implementation detail. We do specify some important properties of any implementation:
- The assignment algorithm is required to assign active stateful tasks to the most-caught-up-instances (see below for a definition of "most caught-up"). I.e., active stateful tasks have the highest assignment priority.
- As a second priority, the algorithm must assign standby tasks to the next-most-caught-up instances. All computed assignments must have at least "num.standby.replicas" number of standby replicas for every task.
- The assignment algorithm may assign extra standby tasks to warm up instances that it wants to move existing active or standby tasks to.
- Of course, the algorithm must also assign stateless tasks.
- The algorithm must converge: if the current assignment is already balanced, it must not alter the assignment.
- The algorithm should produce stable assignments. E.g., it should try not to shuffle stateless tasks randomly among nodes. Depending on the implementation, this may not be easy to guarantee, but it would improve overall performance to provide stability if possible. Note that the convergence requirement (#5) at least guarantees that once the assignment is balanced, it won't change anymore at all.
Computing the most-caught-up instances.
The assignment algorithm is required to assign active and standby tasks to the most-caught-up-instances (see below), with priority given to the active tasks. To do this, we need to define what a "most-caught-up" task is.
First, we'll sort all the tasks and instances, to give the algorithms a stable starting ordering. Then we compute a "rank" for each instance on each task. The "rank" represents how far from "caught up" that instance is on the task. Lower is more caught up. Rank has a floor of acceptable_recovery_lag . That is, all instances that have a lag under acceptable_recovery_lag are considered to have a lag of 0. This allows instances that are "nearly caught up" to be considered for active assignment.
In the case where an instance has no state for a task, we just use the total number of offsets in that task's changelog as the lag. If this isn't convenient, we can also use "max long" as a proxy.
Something like this:
Code Block |
---|
SortedTasks: List<task> := sort(Tasks) // define a stable ordering of tasks
SortedInstances := List<instance> := sort(Instances) // define a stable ordering of instances
StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := {}
// Build StatefulTasksToRankedCandidates. After this block, all tasks map to a ranked collection of all instances, where the rank corresponds to the instance's lag on that task (lower is better).
for task in SortedTasks if task is stateful:
for instance in SortedInstances:
if (instance does not contain task lag):
// then the rank is equivalent to the full set of offsets in the topic, guaranteed to be greater than any instance's actual lag on the task.
StatefulTasksToRankedCandidates[task][task.offsets] := instance
else if (instance[task].lag <= acceptable_recovery_lag:
// then the rank is 0, because any instance within acceptable_recovery_lag is considered caught up.
StatefulTasksToRankedCandidates[task][0] := instance
else:
// then the rank is the actual lag
StatefulTasksToRankedCandidates[task][instance[task].lag] := instance |
For a given task, all the instances with the minimum rank are "most-caught-up".
Defining "balance"
An assignment is "balanced" if it displays both:
- instance balance: The instance with the most tasks has no more than 1 more task than the instance with the least tasks.
- subtopology balance: The subtopology with the most instances executing its tasks has no more than 1 more instance executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.
Rebalance Metadata
The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor
interface, along with a black-box SubscriptionInfo
field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor
assignment response contains another black-box field to encode extra information from the leader to the members.
Today, Streams uses the StreamsPartitionAssignor
to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.
More specifically on subscription:
Code Block |
---|
KafkaConsumer:
Subscription => TopicList SubscriptionInfo
TopicList => List<String>
SubscriptionInfo => Bytes
------------------
StreamsPartitionAssignor:
SubscriptionInfo (encoded in version 6) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
VersionId => Int32
LatestSupportVersionId => Int32
ClientUUID => 128bit
PrevTasks => Set<TaskId>
StandbyTasks => Set<TaskId>
EndPoint => HostInfo |
To support the proposed algorithms, we're proposing a new, version 7, format for SubsriptionInfo
:
Code Block |
---|
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID TaskLags EndPoint
VersionId => Int32
LatestSupportVersionId => Int32
ClientUUID => 128bit
EndPoint => HostInfo
Task Lags => Map<TaskId, Int64> // new change |
The new field, TaskLags
would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks
We do not need any modifications to AssignmentInfo
at this time.
State with logging disabled
There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.
Rather than building this special case into the code, we can simply have the members not add any lag information for non-logged stores. If a task has both logged and non-logged stores, we'd wind up with a task lag that represents the sum of the lags on the logged stores only. If the task has only non-logged stores, the the task would be completely absent from the "task lags" map.
On the leader/assignment side, following the algorithm above for computing `StatefulTasksToRankedCandidates` (to determine the degree of caught-up-ness for each instance on each stateful task), we would fill in a lag of "number of offsets in the changelog" for any instance that doesn't report a lag for a stateful task. For each store in the stateful task that doesn't have a changelog, we would consider its "number of offsets" to be zero. This has the effect of presenting all instances as equal candidates to receive stateful but non-logged tasks.
Stateless tasks
Similar to stateful non-logged tasks, stateless tasks have no notion of "catching up", and therefore there's no concept of a "lag", and we can consider all instances in the cluster to be equal candidates to execute a stateless task.
Again similarly, there's no need to report a lag at all for these tasks. Stateless tasks are not represented at all in `StatefulTasksToRankedCandidates`, so there's no need for special handling on the assignment side, and the assignor just packs the stateless tasks into the cluster to achieve the best balance it can with respect to the assignments of stateful tasks (which are specified to have higher assignment priority).
Race between assignment and state cleanup
Streams starts a background thread to clean up (delete) any state directories that are currently unassigned and have not been updated in `state.cleanup.delay.ms` amount of time.
This KIP introduces a race by reporting the lag on an instance's stores, not just the assigned ones. It is possible that a member will report a lag on an unassigned store, then, while the rebalance is ongoing, the state directory could be deleted (invalidating the lag). If the leader assigns the task to that instance, based on the now-stale lag information, then the member would actually have to rebuild the entire state from the start of the changelog.
Currently, this race is prevented because the state cleaner only runs if the Streams instance is not in "REBALANCING" state. However, as part of KIP-429, the REBALANCING state would be removed from Streams (as it would no longer be necessary). Thus, we need to prevent the race condition without relying on the REBALANCING state.
It is sufficient to create a mutex that the state cleaner must acquire before running (and hold for the duration of its run). If we also acquire the mutex before we report store lags (as part of the JoinGroup), and hold it until we get an assignment back (when the rebalance completes), then we can ensure the race cannot occur.
Therefore, we propose to create such a mutex, and acquire it during org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#subscriptionUserData (when we would compute the store lags), and release it in org.apache.kafka.clients.consumer.ConsumerRebalanceListener#onPartitionsAssigned (when we know that the state directory locks have already been acquired for any newly assigned tasks). Also, we will hold the lock for the entire duration of any call to `org.apache.kafka.streams.processor.internals.StateDirectory#cleanRemovedTasks(long)`.
This builds an unfortunate three-way dependency between the state cleanup, Streams's ConsumerPartitionAssignor, and Streams's ConsumerRebalanceListener. Any cleaner solution would require changing the Consumer APIs, though.
Example Scenarios
Scaling Out
Stateful (active) Tasks := {T1, T2, T3}
Standby Replicas := 1
Instances := {I1, I2}
Initial State
I1 | I2 | |
---|---|---|
active | T1, T3 | T2 |
standby | T2 | T1, T3 |
-- New instance (I3) joins --
First Rebalance (catching-up)
I1 | I2 | I3 | |
---|---|---|---|
active | T1, T3 | T2 | |
standby | T2 | T1, T3 | T1, T3 |
-- I3 finishes catching up, another rebalance is triggered --
Second Rebalance (stable state)
I1 | I2 | I3 | |
---|---|---|---|
active | T1 | T2 | T3 |
standby | T2 | T3 | T1 |
Note that following KIP-429, the second rebalance will technically itself be composed of two rebalance since we are revoking the active task T3 from I1.
Scaling In (Pt. 1 – standbys in sync)
Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Replicas := 1
Instances := {I1, I2, I3}
The standby tasks in this scenario are assumed to be in sync with the active tasks, that is, caught up and within acceptable_recovery_lag.
Initial State
I1 | I2 | I3 | |
---|---|---|---|
active | T1, T4 | T2 | T3 |
standby | T3 | T1, T4 | T2 |
-- One instance (I1) is brought down --
First Rebalance
I2 | I3 | |
---|---|---|
active | T1, T4 | T2, T3 |
standby | T2, T3 | T1, T4 |
In this case, no subsequent rebalance is necessary (note that as of KIP-429 the first rebalance technically consists of two rebalances as an active task is being revoked). Processing resumes immediately as the standby tasks are ready to take over as active tasks.
Scaling In (Pt. 2 – standbys lag nontrivially)
Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Replicas := 1
Instances := {I1, I2, I3}
In this scenario, the standby tasks are lagging the active tasks by some nontrivial amount, that is, by more than the acceptable_recovery_lag.
Initial State
I1 | I2 | I3 | |
---|---|---|---|
active | T1, T4 | T2 | T3 |
standby | T3 | T1, T4 | T2 |
-- One instance (I1) is brought down --
First Rebalance (catching-up)
I2 | I3 | |
---|---|---|
active | T1, T2, T4 | T3 |
standby | T3 | T1, T2, T4 |
-- I3 finishes restoring T4 --
Second Rebalance (stable state)
I2 | I3 | |
---|---|---|
active | T1, T2 | T3, T4 |
standby | T3, T4 | T1, T2 |
Unlike the previous example, I3 cannot immediately take over T2 as an active task since it is not sufficiently caught up. In this case the first rebalance would produce an imbalanced assignment where I2 bears the major processing load while I3 focuses on restoring the task (T4) it will eventually be assigned as active.
Related Work
Note that the main concern of this KIP is how to allocate and re-allocate sharded stateful tasks, of which the state itself is the difficult part. Thus, although other stream processing systems are of prime interest, we can also look to the balancing algorithms employed by distributed databases, as long as those dbs follow the Primary/Replica model. This is advantageous both for the diversity of perspective it lends, but also because some of these database systems are more mature than any modern Stream Processing system.
One thing to note when considering other SP and DB systems is that, unlike most of them, Kafka Streams achieves durability via changelog topics. That is, in Streams, the purpose of a replica is purely a hot standby, and it's perfectly safe to run with no replicas at all. In contrast, most other systems use the replicas for durability, so they potentially need extra machinery to ensure that at all times a certain number of replicas is available, or active, or consistent.
As an example of the degrees of freedom that are unique to Streams, we would be perfectly safe to assign the active task to the most caught-up node and assign the standby task to an empty node and completely discard any other existing replicas. In any other distributed data system, this would result in a dangerous loss of durability.
Kafka Consumer StickyAssignor
The Consumer's StickyAssignor implementation is interesting. It has many of the same goals as Streams' assignor, although it only has to deal with one class of partitions. In contrast, Streams' assignor has to consider: (0) partitions that must be grouped together for tasks, (1) partitions for Stateful tasks, (2) partitions for Stateless tasks, and (3) partitions for Standby tasks. Nevertheless, we could consider generalizing the StickyAssignor algorithm for multiple classes of partitions, as well as the grouping constraint and the standby/active constraint.
The assignment algorithm begins by copying the prior assignment and then removing any assignments that have become invalid (consumer has left or partition no longer exists). Thus, we start with the valid sub-set of the prior assignment and a list of all the partitions that need to be assigned. Then we iterate over the unassigned partitions and assign each one to the consumer that can host it and has that has the smallest current assignment. This is a greedy assignment that should produce an assignment that is as balanced as possible while maintaining all current assignments. Then, we enter the balancing phase.
The balancing phase is an iterative algorithm. In each pass, it attempts to move each partition to a better place, and it continues with more passes until no more improvements are possible, or until the cluster as a whole is fully balanced. (Due to the assignment constraints, full balance may not be possible).
When considering the best move for a partition, it first checks to see if that partition is currently hosted on a consumer that is unbalanced with respect to the prior host of that partition. In this case, it just moves the partition back to the prior host. This is essentially a short-circuit for the case where a partition has become "unstuck" and restoring stickiness could actually improve balance. If we get past that short-circuit, then we just propose to move the partition to the consumer that can host it and has the smallest current assignment.
As mentioned, we keep "shuffling" all partitions in this way until we get an optimal balance, given the constraints.
Cruise Control
Cruse Control is a LinkedIn project to automate some aspects of Kafka broker cluster administration. One thing it does is dynamically rebalance the partition assignment over the broker instances based on a large number of metrics it collects including CPU load, disk usage, etc. It structures the assignment optimization task as a Goal-Based Optimization problem. See https://en.wikipedia.org/wiki/Goal_programming for more information about this paradigm. Note that "goal programming" often implies that you represent the goals as a system of linear equations, and then solve the system to maximize some variables (aka Linear Programming), but that's not how Cruise Control is implemented. It just expresses the optimization problem as a system of goals and seeks a satisfactory (not optimal) solution by sequentially satisfying each goal.
The structure of the implementation is that you get a list of Goal implementations, each one corresponding to an optimization goal, like disk usage. The overall optimizer sorts the goals by priority, and then passes in the cluster assignment to the highest priority goal. That goal proposes modifications to the assignment (typically using an iterative algorithm similar to the StickyAssignor's). Once it's happy with the proposal, it returns. Then the optimizer passes in the new proposed assignment to the next goal, and so on. Each goal is responsible for ensuring that its proposals do not violate any of the prior, higher priority, goals. The API provides a hook that the goal can call during its optimization passes to ensure it doesn't violate the higher priority goals.
I don't think it would be very straightforward to turn Cruise Control into a general framework for allocating resources, because an awareness of the structure of the task is built in at every level (the optimization and the goals all know that they are dealing with Kafka brokers and partitions). But there are of course off-the-shelf optimization frameworks we could consider using if we want to go down a generalization path.
It would be straightforward to implement our allocation algorithm following a similar pattern, though. This might be a good choice if we want to add more optimization goals in the future. The main benefit of the goal-based orientation is that it scales naturally with adding more optimization goals (you just plug them in). As well, it's pretty easy to configure/reconfigure the optimizer to include or remove different goals.
source: https://github.com/linkedin/cruise-control
Redis Cluster
Redis is a high-query-performance database. The main use case for it is as a caching layer. Partly due to this fact, data durability isn't of tremendous importance, and the main operating mode is single-node. However, a clustered version was released in 2015. Reading the documentation, it sounds like the intent is more to provide a simple mechanism for transcending single-node mode than to provide a true distributed database with the kinds of guarantees one would expect (like consistency). Accordingly, the data distribution and primary/replica handling are quite simplistic. This is not meant to be disparaging. Redis Cluster is designed to serve specific use cases well at the expense of other use cases.
In Redis Cluster, keys are hashed into one of 16384 buckets (called slots). The hashing algorithm does not need to be consistent because the slots are fixed in number. Nodes in the cluster have unique and permanent IDs. Cluster integrity is maintained via a gossip protocol, and each node ultimately maintains a connection to every other node. Nodes don't proxy queries to non-local keys. Instead they respond with the hash slot that contains the key, along with the node that currently holds that hash slot (this is similar to Streams). Every node in the cluster is assigned some subset of the slots, and the slots can be manually assigned, unassigned, and moved to, from, and between nodes.
Redis Cluster doesn't have primary and replicas for each slot (the way that Streams has primary and standbys for each task). Rather, it has primary and replicas for each node . Thus, if there are two nodes, A and B with replicas Ar and Br, the node Ar is responsible for replicating the exact same set of slots that are assigned to A, and likewise with B and Br. This simplifies the assignment considerations somewhat, as a given node only needs to host active (primary) copies or passive (replica) copies. The replicas can serve queries if the query specifies that it accepts stale data, but most likely this arrangement results in the replica nodes being underutilized. So, my quick judgement is that the simplicity of this arrangement is enviable, but we probably don't want to follow suit.
reference: https://redis.io/topics/cluster-spec
Elasticsearch
An Elasticsearch cluster is a collection of nodes, an individual instance of Elasticsearch. At the cluster scope, there is a master node that is responsible for coordinating cluster changes such as adding or removing nodes, indices, etc. Each node has one or more shards, which correspond to a certain (Lucene) index that the cluster is fetching data from. An index is divided up into shards across one or more nodes, where the work for that index is distributed across the shards. Each shard has a primary shard responsible for writes, and one or more replica shards that can receive reads.
The rough translations to Streams are shown in the table below. Note that the comparisons are drawn as relates to load balancing, rather than literal definition (for example, an index is really more like a store. However for our purposes it is more useful to think of it as an entire subtopology, in that each index/subtopology is an independent job, that has some inherent "weight" – such as the number of stores for a subtopology – and its work is partitioned and distributed independently, into some number of shards/tasks. The analogies are fairly close, and Elasticsearch has to solve a load balancing problem similar to the one that Streams faces – one main high level difference to point out is that the replica shards are presumed in sync with the active shards, removing the complexity of "restore completeness" from their challenge.
Elasticsearch | Streams |
---|---|
index | subtopology |
master node | group leader |
node | instance |
primary shard | active task |
replica shard | standby task |
shard | task |
Elasticsearch actually breaks down the problem into two separate processes: allocation and rebalancing. Allocation refers to the assignment of (unallocated) shards to nodes, while rebalancing occurs separately and involves moving allocated shards around. By default, rebalancing can only occur when all shards are allocated (can be configured to be allowed only when active shards, or always). Multiple rebalances can take place concurrently, up to some configurable max (defaults to 2) – note that this limit applies only to "load balancing" rebalances and not those forced by environmental (user-defined) constraints. You can also dynamically disable/enable rebalancing either type of shard.
The allocation algorithm is as follows – note that this applies only to placement of unassigned shards, but nodes may have other shards already assigned to them.
- Group shards by index, then sort by shard ID to get the order of shard allocation. First all primary shards are allocated, then one replica for each shard of each index, and repeat if number of replicas is greater than one.
- For each shard, build a list of possible nodes. Nodes may be eliminated from consideration based on user config (eg allocation filtering) or various constraints (no copies of a shard should be allocated to the same node, adequate remaining disk space, forced awareness, max retries)
- If step 2 returns no nodes, the shard will be retried later (possibly after a rebalance). Otherwise, we calculate the weight of each node if given the shard, and allocate it to the one with the lowest weight. The weighting function depends on two settings: indexBalance (0.55 by default) and shardBalance (0.45 by default). The total weight is the weighted average of the shard and index weights, weighted by the fractional shard and index balance respectively. This is computed as
Code Block | ||
---|---|---|
| ||
private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
} |
where
Code Block | ||
---|---|---|
| ||
theta0 = shardBalance / (indexBalance + shardBalance);
theta1 = indexBalance / (indexBalance + shardBalance); |
The shardBalance and indexBalance can be thought of as controlling the tendency to equalize the number of shards (shardBalance) or number of shards per index (indexBalance) across nodes. This is analogous to Streams assigning a balanced number of tasks to each StreamThread for the former, and spreading the tasks of a given subtopology across StreamThreads for the latter. The weighting formula used by Elasticsearch indicates there is some tradeoff between the two, but that applies only to Elasticsearch's allocation, where some shards have already been allocated to some subset of nodes. For example, if you add a new node and a new index, you could allocate all the new shards to the empty node for shard number balance, or give each node a shard for shard index balance, but likely not both.
If you start with a "clean assignment" where all shards must be (re)allocated, both can be balanced at once. You can simply group the shards by index, then loop through the nodes assigning a shard from the list in round-robin fashion. This would be the case when first starting up in Streams. However, the more "sticky" the assignment in Streams the better, so we tend to avoid this kind of simple round-robin assignment on subsequent rebalances. So the tradeoff in Streams is between balance (however that is defined) and stickiness – whereas in Elasticsearch, the tradeoff is first between types of balance where stickiness is effectively a constraint (of allocation), and the stickiness is then (potentially) traded for further overall balance by the rebalancing operation. One interesting thing to note is that, while Streams effectively enforces "total balance" (defined as number of tasks across threads), this is actually configurable in Elasticsearch. You can tune the cluster to be more or less aggressive about balancing the shards across nodes. This might be useful if users just want to get up and running again as fast as possible, and would be satisfied with a slightly imbalance workload.
Of course, the workload in Streams is already potentially unbalanced because our definition of balance is simple and does not account for degree of statefulness. But maybe assigning weights that account for "stickiness" – how much would this instance have to catch up? – and "load" – some measure of the number of stores – would be useful for distributing tasks.
YARN
“Yet Another Resource Negotiator” is a cluster management technology introduced in Hadoop 2.0 that decouples resource management from job scheduling. The YARN framework consists of a global master daemon called the ResourceManager, a per-application ApplicationMaster, and per-node NodeManagers. NodeManagers are responsible for allocating containers and running/managing processes within them, and monitoring/reporting their resource usage to the ResourceManager, who in turn is responsible for keeping track of live NodeManagers and arbitrating resources among competing applications. The ApplicationMaster manages the application life cycle, negotiating resources from the ResourceManager and triggering NodeManagers to begin executing tasks for that application.
The ResourceManager itself has two main components, the Scheduler and the ApplicationsManager. The ApplicationManager accepts jobs and is responsible for managing the application's ApplicationMaster (including starting & restarting it if necessary). The Scheduler is then free to allocate resources to applications without concerning itself with monitoring applications or restarting failed tasks. The actual scheduling policy can be plugged in to partition the cluster resources.
This scheduling however is fairly orthogonal to the balancing/restoring problem we face, since they involve the question of how to distribute new/freed resources to existing jobs rather than how to distribute all existing "jobs" (tasks) to all available "resources" (instances/threads). The interesting difference here is that in Streams, a new resource (fresh instance) necessitates revoking tasks from another instance, whereas in YARN a new resource can simply be given away. Maybe the right way to look at it is to consider the YARN resources as Stream tasks, and YARN jobs as Streams instances (or threads, just using instance for now) – one resource/task can only ever run one job/be run on one instance, while new instances/jobs are started and stopped, and require some (re)allocation of resources/tasks.
Flink
Flink jobs are composed of operators, a chain of which forms a task, which can be executed on threads as a one or more subtasks. The total number of subtasks is the parallelism of that operator. A Flink task corresponds to a Streams subtopology, and a Flink subtask corresponds to a Stream task. In the following section, task will refer to a Flink task and any references to a Streams task will be labelled as such.
The Flink runtime consists of two processes, the JobManager which schedules tasks and coordinates checkpoints, failure recovery, etc. and one or more TaskManagers which execute the subtasks and buffer/exchange streams of data. TaskManagers, similar to a Streams app instance, are brought up and connect to the JobManager to announce themselves as available, then are assigned work. Each TaskManager has one or more task slots corresponding to a fixed subset of the TaskManager's resources – they share network connections similar to how Streams tasks share a StreamThread consumer.
Also similarly to StreamThreads executing one or more Streams tasks, Flink allows subtasks of the same job to share task slots (note that a single task slot may use one or more threads, though Flink does recommend matching the number of task slots to the number of CPU cores). This can be controlled to a hard or soft degree by the user defined CoLocationGroup (which subtasks must share a slot) and SlotSharingGroup (which subtasks can share a slot).
Flink integrates with other cluster resource managers including YARN, but can also be run as a stand-alone cluster. Unlike YARN, in general Flink jobs are continuous (like Streams or Elasticsearch) so the "load balancing" aspect is more important than the "scheduling/distributing transient resources" aspect. This brings it closer to Streams, but unlike Streams, Flink has no notion of standby tasks – instead, for high availability some distributed storage is required for checkpoints to be saved/recovered from. This greatly simplifies the assignment logic relative to the Streams/KIP-441 case.
Samza
Out of all the other systems we've looked at Samza is the closest in comparison to Kafka Streams.
Tasks and Containers
Like Streams, the unit of parallelism is a task. Samza assigns tasks to a partition from an input stream. Tasks in Samza also perform periodic checkpointing. The checkpointing allows the resuming of processing from the latest offset on a different worker in the case of failures. Tasks themselves are hosted in a Container, which is the physical unit of work, compared with tasks which are a logical unit of work. A Container will have one or more tasks and are distributed across different hosts.
Coordinator
To handle the distribution of tasks to containers, Samza uses a Coordinator. The Coordinator also monitors the containers, and when a failed container is detected, the tasks of the failed container are distributed out to the remaining healthy containers. The Coordinator is pluggable, giving Samza the unique ability to run either in standalone/embedded mode or cluster mode with a cluster-manager such a YARN.
State
Samza also offers stateful operation and uses RocksDB for the implementation of the persistent storage. Each task has its own state-store. Storage engines are pluggable as well. For fault tolerance, state-stores use changelog (compacted) topics as well.
Host Affinity
To deal with possible long startup times necessary when reading an entire changelog, Samza persists metadata containing the host that each task is running on. When users elect to enable the job .host-affinity.enabled configuration, Samza will attempt to place a container for a given task(s) on the same host every time a job is deployed. The host affinity feature is done on a best-effort basis, and in some cases, a container will get assigned to another available resource.
Given the closeness of Samzas architecture to Kafka Streams, IMHO it makes sense to take a quick look into the algorithm used by the Coordinator concerning load distribution and stateful tasks.
Heron
Twitter developed Heron in 2011 to overcome the shortcomings of Apache Storm attempting to handle the production load of data.
Specifically, the items Twitter needed to address are
- Resource isolation
- Resource efficiency
- Throughput
Heron claims to be compatible with Storm, but there are a few steps developers need to take https://apache.github.io/incubator-heron/docs/migrate-storm-to-heron/, so it's not entirely backward compatible out of the box.
Heron API
Heron is similar to Kafka Streams in that it offers a high-level DSL (https://apache.github.io/incubator-heron/docs/concepts/streamlet-api/) and a lower level API (https://apache.github.io/incubator-heron/docs/concepts/topologies/#the-topology-api) based on the original Storm API.
Topology
Heron topologies are composed of two main components Spouts and Bolts.
Spouts
Spouts inject data into Heron from external sources Kafka, Pulsar. So a spout is analogous to an input topic in Kafka Streams.
Bolts
Bolts apply the processing logic defined by developers on the data supplied by Spouts.
Streams of data connect spouts and Bolts.
Architecture
Heron needs to run on a cluster of machines. Once developers build a topology, they submit the topology to cluster, much like Spark, Flink in that you develop the application then distributed to the worker nodes
Resource Management
When using the high-level Streamlet API Heron allows you to specify resources for the topology:
- The number of containers into which the topology's physical plan divided into.
- The total number of CPUs allocated to be used by the topology
- The total amount of RAM allocated to be used by the topology
Operations
Heron offers stateless and stateful operations. There are windowing operations similar to Streams Sliding, Tumbling, and Time based windows. Heron also offers the concept of "counting" windows.
State Management
Heron uses either Zookeeper (https://apache.github.io/incubator-heron/docs/operators/deployment/statemanagers/zookeeper/) or the local file system (https://apache.github.io/incubator-heron/docs/operators/deployment/statemanagers/localfs/) for managing state. Details about state management are relatively sparse, but IMHO, I think the approach to state management is far enough from Kafka Streams that there is nothing we can gain Heron's state management.
The overall recommendation from the Heron documentation is that the local file system state management is for local development only, and Zookeeper is the preferred approach in a production environment.
Compatibility, Deprecation, and Migration Plan
We will implement the new algorithm as a new PartitionAssignor implementation without removing the existing StreamsPartitionAssignor. Thus, the existing upgrade/downgrade procedure makes moving in both directions seamless. As long as any member does not support the new assignment protocol, all members will use the existing assignment strategy. As soon as all members indicate they support the new strategy, the group will automatically encode the SubscriptionInfo in the new format, and the leader will automatically start using the new PartitionAssignor, including the new algorithm and the probing rebalances.
Thus, the upgrade should be seamless.
Ultimately, we will want to remove the old StreamsPartitionAssignor. For example, we may remove it in version 3.0. In that case, we will document that operators must upgrade the whole cluster to version [2.4, 3.0) first, before upgrading to version 3.0+
Rejected Alternatives
Triggering a probing rebalance when Moving tasks reach the acceptable lag
One idea was, instead of triggering a probing rebalance on an interval, to trigger it when Moving tasks actually catch up. One advantage of this is that you don't get rebalances when none of the Moving tasks have caught up yet. In the interval-based approach, you are guaranteed not to trigger probing rebalances more frequently than the interval, because there is just one node (the leader) that triggers the rebalance. On the other hand, if you want to trigger when members catch up, the members themselves have to do the triggering. Since they are all independent, they will cause "storms" or rebalances when first one member catches up, and the cluster rebalances, then the next catches up, and we rebalance again, and so forth.
Ideally, the members would have a way to inform the leader that they have caught up, and the leader could then make a global decision about whether to rebalance now, or wait a bit. Unfortunately, there is currently no channel for this communication. The only one is for a member to join the group again, which causes a full rebalance; there's no way to avoid it. Instead, we've proposed the interval approach for now, to ensure that the frequency of rebalances can be controlled, and we plan to add this informational channel in the future.
Adding a new kind of task ("moving", "recovering", "learner") for task movements
During the KIP discussion, an idea came up, instead of assigning standbys to accomplish task movements, to create a new kind of task with a name that doesn't suggest the idea of a "hot standby". One rationale is that it might be confusing as a user to configure `num.standby.replicas:=0`, but then still see standbys in the logs, etc. Another thought is that it seems more straightforward to have two different names for what seems like two different roles.
However, it turns out to cause more conflicts that it solves. To start off, here's a "happy path" scenario:
Happy path scenarios
Looking at a cluster with two nodes and two tasks, with no standbys, immediately after having lost and replaced a node, the KIP would assign active tasks like this:
Code Block |
---|
num.standby.replicas:0
Node1: [Task1(active), Task2(active)]
Node2: []
(no standbys reported in logs, etc.) |
To achieve balance, we need to move one of the tasks over, so we'd create a "moving" task for it to catch up:
Code Block |
---|
num.standby.replicas:0
Node1: [Task1(active), Task2(active)]
Node2: [Task2(moving)]
(no standbys reported in logs, etc.) |
Trouble
However, what should we do if we did have standbys configured?
Looking at a cluster with two nodes and two tasks, with no standbys, immediately after having lost and replaced a node, the KIP would assign active tasks like this:
Code Block |
---|
num.standby.replicas:1
Node1: [Task1(active), Task2(active)]
Node2: [Task1(standby, not caught up), Task2(standby, not caught up)]
(standbys ARE reported in logs, etc.) |
To achieve balance, we need to move one of the tasks over, but now we cannot create our "moving" task because Node2 already has a standby task for Task2, and it can't host two different "types" of the same task. We have several choices, and none of them are satisfying.
- We could make Task2 a "moving" task instead of a "standby" task. This is true to the idea of adding a "moving" type, because it tells us that Task2 is intended to become active after it catches up. But the bummer is that we would not report it as a standby task. So operators would be confused and probably concerned to see that Task2 has no standby task in this scenario.
- We could somehow mark the task as both "moving" and "standby", so that it's a "moving" task for the purpose of assignment, but it's also reported as a "standby" task in the logs, etc. This results in increases the code complexity, since we have to handle the task properly all over Streams, whenever we log it or store it in a collection, to decide if we're going to treat it as a standby or a moving task.
In retrospect, if we had called these standbys "replicas", it would have helped the situation. Then, it would seem more ok to have an extra replica for moving a shard sometimes , and you'd always expect to see at least "num.standby.replicas" number of replicas in your cluster. But it's a little hard at this point to consider completely renaming "StandbyTask" to "ReplicaTask", etc
...
Code Block | ||||
---|---|---|---|---|
| ||||
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
Scaling down the application, S2 will be leaving.
#First Rebalance
Member S2 joins the group and claims that it is leaving.
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [], learning: [T3])
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5], revoked: [], learning: [T4])
#Second Rebalance
S3 finishes replay first and trigger another rebalance
Member S1 ~ S3 join with following status:(S1 is not ready yet)
S1(assigned: [T1, T2], revoked: [], learning: [T3])
S2(assigned: [], revoked: [T3, T4], learning: [])
S3(assigned: [T5], revoked: [], learning: [T4])
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [], learning: [T3])
S2(assigned: [T3], revoked: [T4], learning: [])
S3(assigned: [T4, T5], revoked: [], learning: [])
#Third Rebalance
S1 finishes replay and trigger rebalance.
Member S1~S3 join with following status:
S1(assigned: [T1, T2], revoked: [], learning: [T3])
S2(assigned: [], revoked: [T3], learning: [])
S3(assigned: [T4, T5], revoked: [], learning: [])
S1 performs task assignments:
S1(assigned: [T1, T2, T3], revoked: [], learning: [])
S2(assigned: [], revoked: [T3], learning: [])
S3(assigned: [T4, T5], revoked: [], learning: [])
S2 will shutdown itself upon new assignment since there is no assigned task left. |
Online Host Swapping (Scaling Up Then Down)
This is a typical use case where user wants to replace entire application's host type. Normally administrator will choose to do host swap one by one, which could cause endless KStream resource shuffling. The recommended approach under cooperative rebalancing is like:
- Increase the capacity of the current stream job to 2X and boost up new type instances.
- Mark existing stream instances as leaving.
- Learner tasks finished on new hosts, shutting down old ones.
Code Block | ||||
---|---|---|---|---|
| ||||
Group stable state: S1[T1, T2], S2[T3, T4]
Swapping application instances, adding S3, S4 with new instance type.
#First Rebalance
Member S3, S4 join the group.
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [], learning: [])
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [], revoked: [], learning: [T2])
S4(assigned: [], revoked: [], learning: [T4])
Use scaling tool to indicate S1 & S2 are leaving.
#Second Rebalance
Member S1, S2 initiate rebalance to indicate state change (leaving)
Member S1~S4 join with following status:
S1(assigned: [T1], revoked: [T2], learning: [])
S2(assigned: [T3], revoked: [T4], learning: [])
S3(assigned: [], revoked: [], learning: [T2])
S4(assigned: [], revoked: [], learning: [T4])
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [], learning: [])
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [], revoked: [], learning: [T1, T2])
S4(assigned: [], revoked: [], learning: [T3, T4])
#Third Rebalance
S3 and S4 finishes replay T1 ~ T4 trigger rebalance.
Member S1~S4 join with following status:
S1(assigned: [], revoked: [T1, T2], learning: [])
S2(assigned: [], revoked: [T3, T4], learning: [])
S3(assigned: [], revoked: [], learning: [T1, T2])
S4(assigned: [], revoked: [], learning: [T3, T4])
S1 performs task assignments:
S1(assigned: [], revoked: [], learning: [])
S2(assigned: [], revoked: [], learning: [])
S3(assigned: [T1, T2], revoked: [], learning: [])
S4(assigned: [T3, T4], revoked: [], learning: [])
S1~S2 will shutdown themselves upon new assignment since there is no assigned task left. |
Edge Scenarios
Backing Up Information On Leader
Since the incremental rebalancing requires certain historical information of last round assignment, the leader stream thread will need to maintain the knowledge of:
- Who participated in the last round of rebalance. This is required information to track new comers.
- Who will be leaving the consumer group. This is for scaling down support as the replay could take longer time than the scaling down timeout. Under static membership, since we don't send leave group information, we could leverage leader to explicitly trigger rebalance when the scale-down timeout reaches. Maintaining set of leaving members are critical in making the right task shuffle judgement.
These are essential group state knowledges leader wants to memorize. To avoid the severity of leader crash during scaling, we are avoiding backing up too much information on leader for now. The following edge cases are around leader incident during scaling.
Leader Transfer During Scaling
Leader crash could cause a missing of historical assignment information. For the learners already assigned, however, each stream thread maintains its own assignment status, so when the learner task's id has no corresponding active task running, the transfer will happen immediately. Leader switch in this case is not a big concern.
Code Block | ||||
---|---|---|---|---|
| ||||
Cluster has 3 stream stream threads S1(leader), S2, S3, and they own tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
#First Rebalance
New member S4 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [], learning: [])
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [T1])
#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status:
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We
could rebalance T1, T2 immediately.
S2 performs task assignments:
S2(assigned: [T3, T4], revoked: [], learning: [])
S3(assigned: [T5, T2], revoked: [], learning: [])
S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance. |
Leader Transfer Before Scaling
However, if the leader dies before new instances join, the potential risk is that leader could not differentiate which stream instance is "new", because it relies on the historical information. For version 1.0, final assignment is probably not ideal in this case if we only attempt to assign learner task to new comers. This also motivates us to figure out a better task coordination strategy for load balance in long term.
Code Block | ||||
---|---|---|---|---|
| ||||
Cluster has 3 stream threads S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]
#First Rebalance
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader, while T1 is not assigned now
S2 ~ S4 join with following status
S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
S3(assigned: [], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments: (no learner assignment since S2 doesn't know S4 is new member)
S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
S3(assigned: [T1], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [])
Now the group reaches balance, although the eventual load is skewed. |
Assignment Algorithm
The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". Next, we will present a holistic view of the new learner assignment algorithm during each actual rebalance.
The assignment will be broken down in the order of: active, learner and standby tasks.
Code Block | ||
---|---|---|
| ||
Algorithm incremental-rebalancing
Input Set of Tasks,
Set of Instances,
Set of Stream Threads,
Where each stream thread contains:
Set of active Tasks,
Set of standby Tasks,
owned by which instance
Main Function
Assign active tasks: (if any)
To instances with learner tasks that indicate "ready"
To previous owners
To unready learner tasks owners
To instances with standby tasks
To instances who are not marked "leaving"
To resource available instances
Keep existing learner tasks' assignment unchanged
Pick new learner tasks out of heaviest loaded instances
Assign learner tasks: (if any)
To new-coming instances with abundant resource
To instances who are not marked "leaving"
To instances with corresponding standby tasks
Prerequisite is that the instance version supports learner mechanism.
Assign standby tasks: (if any)
To instances without matching active tasks
To previous active task owners after learner transfer in this round
To resource abundant instances
To instances who are not marked "leaving"
Based on num.standby.task config, standby task assignment could take multiple rounds
Output Finalized Task Assignment |
Stream Task Tagging
To enable learner resource shuffling behavior, we need to have the following task status indicators to be provided:
...
Optimizations
Stateful vs Stateless Tasks
For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task, so that when we eventually consider the load balance of the stream applications, this could help us separate out tasks into two buckets and rebalance independently.
Eager Rebalance
Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream thread could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency by introducing many more rebalances. We could supply user with a config to decide whether they want to take eager approach or stable approach eventually, with some follow-up benchmark tools of the rebalance efficiency. Example:
A stream thread S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the stream thread will call out rebalance immediately when T1 finishes replaying. While under conservative approach, stream thread will rejoin the group until it finishes replaying both T1 and T2.
Standby Task Utilization
Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize stream threads which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly.
Scale Down Timeout
User naturally wants to reach a sweet spot between ongoing task transfer and streaming resource free-up. So we want to take a similar approach as KIP-415, where we shall introduce a client config to make sure the scale down is time-bounded. If the time takes to migrate tasks outperforms this config, the leader will send out join group request and force removing active tasks on the leaving members and transfer those tasks to other staying members, so that leaving members will shut down themselves immediately after this round of rebalance.
Trade-offs
More Rebalances vs Global Efficiency
The new algorithm will invoke many more rebalances than the current protocol as one could perceive. As we have discussed in the overall incremental rebalancing design, it is not always bad to have multiple rebalances when we do it wisely, and after KIP-345 we have a future proposal to avoid scale up rebalances for static members. The goal is to pre-register the members that are planning to be added. The broker coordinator will augment the member list and wait for all the new members to join the group before rebalancing, since by default stream application’s rebalance timeout is infinity. The conclusion is that: it is server’s responsibility to avoid excessive rebalance, and client’s responsibility to make each rebalance more efficient.
Metadata Space vs Allocation Efficiency
Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we add-on too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.
Iteration Plan
For the smooth delivery of all the features discussed so far, the iteration is divided into four stages:
...
Delivery goal: Scale up support, conservative rebalance
The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader stream thread will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to new instances once. The reason for only implementing new instances logic is because there is a potential edge case that could break current naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. We plan to better address this issue in version 4.0 where we take eventual load balance into consideration. Some discussions have been initiated on marking task weight for a while. To me, it is unclear so far what kind of eventual balance model we are going to implement at current stage. In conclusion, we want to postpone the finalized design for eventual balance until last version.
Version 2.0
Delivery goal: Scale down support
We will focus on the delivery of scaling down support upon the success of version 1.0. We need to extend on the v1 protocol since we need existing instances to take the extra learning load. We shall break the statement in v1 which claims that "only new instances could take learner tasks". To make this happen, we need to deliver in following steps:
- Create new tooling for marking instances as ready to scale down.
- Tag the leaving information for targeted members.
- Scale down timeout support.
Version 3.0
Delivery goal: Eager rebalance
A detailed analysis and benchmark test need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely unfavorable.
The version 3.0 is upon version 1.0 success, and could be done concurrently with version 2.0. We may choose to adopt or discard this change, depending on the benchmark result.
Version 4.0 (Stretch)
Delivery goal: Task state labeling, eventual workload balance
Question here: we could deviate a bit from designing the ultimate goal, instead providing user a handy tool to do that.
The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks (according to relative instance capacity), which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave.
Code Block | ||||
---|---|---|---|---|
| ||||
A group with 4 instances that have following capacities: 5, 10, 10, 15, and 80 tasks shall expect the perfect balances into:
10(5/40), 20(10/40), 20(10/40), 30(15/40) tasks.
In case we set imbalance factor to 20%
then an eventual assignment like
12, 18, 23, 27 should be stable, as all of them are not way-off the expected load assignment. |
...
Public Interfaces
We are going to add a new type of protocol called "stream" for the protocol type.
Code Block | ||||
---|---|---|---|---|
| ||||
ProtocolTypes : {"consumer","connect","stream"} |
...
Default: incremental
Version 1.0
...
The setting to help ensure no downtime upgrade of online application.
Options : upgrading, incremental
...
Default: infinity
Version 2.0
...
Time in milliseconds to force terminate the stream thread when informed to be scaled down.
...
learner.partial.rebalance
Default : true
Version 3.0
...
If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the replaying. Otherwise, new stream thread will batch the ready call to ask for a single round of rebalance.
...
Default: 0.2 (20%)
Version 4.0
...
The tolerance of task imbalance factor between hosts to trigger rebalance.
Implementation Plan
To make sure the delivery is smooth with fundamental changes of KStream internals, we build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the algorithm requirements are highly coupled with internal architecture reasoning.
Compatibility, Deprecation, and Migration Plan
Minimum Version Requirement
This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.
Recommended Upgrade Procedure
As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is like:
- Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream application to stay with protocol type "consumer".
- Rolling restart the stream application and the change is automatically applied. This is safe because we are not changing protocol type.
In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far, user could choose to take this much easier workaround.
Rejected Alternatives
N/A for the algorithm part. For implementation plan trade-off, please review the doc in implementation plan.