...
Code Block | ||
---|---|---|
| ||
Algorithm incremental-rebalancing Input Set of Tasks, Set of Instances, Set of Workers, Where each worker contains: Set of active Tasks, Set of standby Tasks, owned by which instance Main Function Assign active tasks: (if any) To instances with learner tasks that indicate "ready" To previous owners To unready learner tasks owners To instances with standby tasks To resource available instances Keep existing learner tasks' assignment unchanged Pick new learner tasks out of heaviest loaded instances Assign learner tasks: (if any) To new-coming instances with abundant resource To instances with corresponding standby tasks Assign standby tasks: (if any) To instances without matching active tasks To previous active task owners after learner transfer in this round To resource available instances Based on num.standby.task config, this could take multiple rounds Output Finalized Task Assignment |
Task Tagging
Note that to make sure the above resource shuffling could happen as expectedTo enable learner resource shuffling behavior, we need to have the following task status indicators to be provided:
Tag Name | Task Type | Explanation |
---|---|---|
isStateful | both | Indicate whether given task has a state to restore. |
isLearner | standby | Indicate whether standby task is a learner task. |
beingLearned | active | Indicate whether active task is being learned by some other stream worker. |
isReady | standby | Indicate whether standby task is ready to serve as active task. |
...
Optimizations
Stateful vs Stateless Tasks
...
The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader worker will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to them once. The reason we are hoping to only implement new instances is because there is a potential edge case that could break the existing naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. Also we care more about smooth transition over resource balance for stage one. We do have some historical discussion on marking weight for different types of tasks. If we go ahead to aim for task balance too early, we are potentially in the position of over-optimization. In conclusion, we want to delay the finalized design for eventual balance until last version.
We also don't want to take the eager rebalance optimization in version 1.0 due to the explained concerns.
Version 2.0
Delivery goal: Scale down support
...
Delivery goal: Eager rebalance analysis
A detailed analysis and benchmark support need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely things we are not in favor of.
...
The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks (according to relative instance weight), which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave.
Example:
A group with 4 instances that have following capacities: 5, 10, 10, 15, and 80 tasks shall expect the perfect balances into:
10, 20, 20, 30 tasks.
In case we set imbalance factor to 20%
then an eventual assignment like
12, 18, 23, 27 should be stable, as all of them are not way-off the expected load assignment.
We also want to balance the load separately for stateful tasks and stateless tasks as discussed above. So far version 4.0 still has many unknowns and is slightly beyond the incremental rebalancing scope. A separate KIP could be initiatedOur plan is to keep iterating on the details or bake a separate KIP in the future.
Public Interfaces
We are going to add a new type of protocol called "stream" for the protocol type.
Code Block | ||||
---|---|---|---|---|
| ||||
ProtocolTypes : {"consumer", "connect", "stream"} |
Also a bunch of adding new configs for user to better apply and customize the scaling change.
...