...
- Worker (A.K.A stream worker): thread level streaming processor, who actually takes the stream task.
- Instance (A.K.A stream instance): the KStream instance serving as container of stream workers set. This could suggest a physical host or a k8s pod. We will interleave the definition of worker and instance for the most part of discussion concerning "working member" because the capacity is essentially controlled by the instance relative size, not the worker.
- Learner task: a special standby task that gets assigned to one stream instance to restore a current active task state from another instance.
...
Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate main active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out rebalance of the new task assignment. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task.
Stop-The-World Effect
As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all workers would
...
Code Block | ||||
---|---|---|---|---|
| ||||
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks T1 ~ T5 Group stable state: S1[T1, T2], S2[T3, T4], S3[T5] #First Rebalance New member S4 joins the group S1 performs task assignments: S1(assigned: [T1, T2], revoked: [], learning: []) S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) #Second Rebalance S1 crashes/gets killed before S4 is ready, S2 takes over the leader. Member S2~S4 join with following status: S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We could rebalance T1, T2 immediately. S2 performs task assignments: S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5, T2], revoked: [], learning: []) S4(assigned: [T1], revoked: [], learning: []) Now the group reaches balance. |
Optimizations
Stateful vs Stateless Tasks
Leader Transfer Before Scaling
However, if the leader dies before new instances join, the potential risk is that leader could not differentiate which stream instance is "new", because it relies on the historical information. The final assignment is probably not ideal in this case if we only attempt to assign learner task to new comers in version 1.0. This also motivates us to figure out a better task coordination strategy for load balance in long term.
Code Block | ||||
---|---|---|---|---|
| ||||
Cluster has 3 stream workers S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]
#First Rebalance
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader
S2 ~ S4 join with following status
S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
S3(assigned: [], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments:
S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
S3(assigned: [T1], revoked: [], learning: [])
S4(assigned: [], revoked: [], learning: [])
Now the group reaches balance, although the eventual load is skewed. |
Optimizations
Stateful vs Stateless Tasks
For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task, so that when we eventually consider the load balance of the stream applications, this could help us separate out tasks into two buckets and rebalance independently.
...
Code Block | ||
---|---|---|
| ||
Algorithm incremental-rebalancing Input Set of Tasks, Set of Instances, Set of Workers, Where each worker contains: Set of active Tasks, Set of standby Tasks, owned by which instance Main Function Assign active tasks: (if any) To instances with learner tasks that indicates "ready" To previous owners To unready learner tasks owners To instances with standby tasks To resource available instances Keep existing learner task assignment Pick new learner tasks out of heaviest loaded instances Assign learner tasks: (if any) To new-coming instances with abundant resource To instances with corresponding standby tasks Assign standby tasks: (if any) To instances without matching active tasks To previous active task owners after learner transfer in this round To resource available instances Based on num.standby.task config, this could take multiple rounds Output Finalized Task Assignment |
Also for For the smooth delivery of all the features we have discussed so far, an iteration plan of reaching final algorithm is defined as below:
Version 1.0 Anchor algorithm_version_1_0 algorithm_version_1_0
algorithm_version_1_0 | |
algorithm_version_1_0 |
Delivery goal: Scale up support, conservative rebalance
...
stream.imbalance.percentage Default: 0.2 Version 4.0 | The tolerance of task imbalance factor between hosts to trigger rebalance. |
Implementation Plan
We want to callout Call out this portion because the algorithm we are gonna design is fairly complicated so far. To make sure the delivery is smooth with fundamental changes of KStream internals, I build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the changes are highly coupled with internal changes. Without these details, the algorithm is not making sense.
...