...
Code Block | ||||
---|---|---|---|---|
| ||||
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks 1T1 ~ 5T5 Group stable state: S1[T1, T2], S2[T3, T4], S3[T5] #First Rebalance New member S4 joins the group S1 performs task assignments: S1(assigned: [T1, T2], revoked: [], learning: []) S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) #Second Rebalance S1 crashes/gets killed before S4 is ready, S2 takes over the leader. Member S2~S4 join with following status: S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We could rebalance T1, T2 immediately. S2 performs task assignments: S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5, T2], revoked: [], learning: []) S4(assigned: [T1], revoked: [], learning: []) Now the group reaches balance. |
...
Code Block | ||
---|---|---|
| ||
Algorithm incremental-rebalancing
Input Set of Tasks,
Set of Instances,
Set of Workers,
Where each worker contains:
Set of active Tasks,
Set of standby Tasks,
owned by which instance
Main Function
Assign active tasks: (if any)
To instances with learner tasks that indicates "ready"
To previous owners
To unready learner tasks owners
To instances with standby tasks
To resource available instances
Pick learner tasks out of heaviest loaded instances
Assign learner tasks: (if any)
To previous owners (no half way bounce at least in the first version)
To new-coming instances with abundant resource (first version)
To instances with corresponding standby tasks
Assign standby tasks: (if any)
To instances without matching active tasks
To previous active task owners
To resource available instances
Based on num.standby.task config, this could take multiple rounds
Output Finalized Task Assignment |
Also for the smooth delivery of all the features we have discussed so far, an iteration plan of this final algorithm is as below:
Version 1.0
...
The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. Newly spinned instance will tag themselves as new comers.
A bit explanation of 1.0 goal:
The leader worker will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to them once. The reason we are hoping to only implement the learner task logic on new hosts instances is because there is a potential edge case that could break the existing naive learner assignment. When : when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling.We will Also we care more about smooth transition over resource balance for stage one. This is because we We do have some historical discussion on marking weight for different types of tasks. If we go ahead to aim for task balance too early, we are potentially in the position of over-optimization.So we only gonna assign learner task only to "new comers", which means every stream worker will denote itself as "new member" when they don't have local information. The assignment, however, will be on the task In conclusion, we want to delay the finalized design for eventual balance until last version.
Version 2.0
Delivery goal: Scale down support
We will focus on the delivery of scaling down support upon the success of version 1.0
...
. We need to extend on the v1 protocol since we need existing instances to take the extra learning load. We shall break the statement in v1 which claims that "only new instances could take learner tasks". To make this happen, we need to deliver in following steps:
- Create new tooling for marking instances as scaling down in the futureready to scale down
- Tag the leaving information for targeted members
- Scale down timeout implementation
Version 3.0
Delivery goal: eager rebalance analysis
...