ca
Table of Contents |
---|
Status
Current state: [Under Discussion]
...
Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance of the new task assignmentto do task transfer. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task.
...
Leader crash could cause a missing of historical assignment information. For the learner assignmentlearners already assigned, however, each worker maintains its own assignment status, so when the learner task's id has no corresponding active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. The essence is that we don't rely on leader information to do the assignment.
Code Block | ||||
---|---|---|---|---|
| ||||
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks T1 ~ T5 Group stable state: S1[T1, T2], S2[T3, T4], S3[T5] #First Rebalance New member S4 joins the group S1 performs task assignments: S1(assigned: [T1, T2], revoked: [], learning: []) S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) #Second Rebalance S1 crashes/gets killed before S4 is ready, S2 takes over the leader. Member S2~S4 join with following status: S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We could rebalance T1, T2 immediately. S2 performs task assignments: S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5, T2], revoked: [], learning: []) S4(assigned: [T1], revoked: [], learning: []) Now the group reaches balance. |
...
However, if the leader dies before new instances join, the potential risk is that leader could not differentiate which stream instance is "new", because it relies on the historical information. The . For version 1.0, final assignment is probably not ideal in this case if we only attempt to assign learner task to new comers in version 1.0. This also motivates us to figure out a better task coordination strategy for load balance in long term.
...
Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency by introducing many more rebalances. We could supply user with a config to decide whether they want to take eager approach or stable approach eventually, with some follow-up benchmark tools of the rebalance efficiency. Example:
A stream worker S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the worker will call out rebalance immediately when T1 finishes replaying. While under conservative approach, worker will rejoin the group until it finishes replaying of both T1 and T2.
...
The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". However, we also want to have a holistic view of the new learner assignment algorithm during each actual rebalance.
We shall assign tasks in the order of: active, learner and standby. The assignment will be broken down into following steps:
Code Block | ||
---|---|---|
| ||
Algorithm incremental-rebalancing Input Set of Tasks, Set of Instances, Set of Workers, Where each worker contains: Set of active Tasks, Set of standby Tasks, owned by which instance Main Function Assign active tasks: (if any) To instances with learner tasks that indicatesindicate "ready" To previous owners To unready learner tasks owners To instances with standby tasks To resource available instances Keep existing learner tasktasks' assignment unchanged Pick new learner tasks out of heaviest loaded instances Assign learner tasks: (if any) To new-coming instances with abundant resource To instances with corresponding standby tasks Assign standby tasks: (if any) To instances without matching active tasks To previous active task owners after learner transfer in this round To resource available instances Based on num.standby.task config, this could take multiple rounds Output Finalized Task Assignment |
...
Version 3.0
Delivery goal: eager rebalance Eager rebalance analysis
A detailed analysis and benchmark support need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely things we are not in favor of.
The version 3.0 is upon version 1.0 success, and could be done concurrently with version 2.0. We may choose to adopt or discard this change, depending on the benchmark result.
Version 4.0 (Stretch)
Delivery goal: task Task labeling, eventual workload balance
The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks, which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave.
We also want to balance the load separately for stateful tasks and stateless tasks as discussed above. So far version 4.0 still has many unknowns and is slightly beyond the incremental rebalancing scope. A separate KIP may could be initiated to discuss this approach in detail.
Trade-offs
More
...
Rebalances
The new algorithm will invoke many more rebalances than the current protocol as one could perceive. As we have discussed in the overall incremental rebalancing design, it is not always bad to have multiple rebalances when we do it wisely, and after KIP-345 we have a future proposal to avoid scale up rebalances for static members. The goal is to pre-register the members that are planning to be added. The broker coordinator will augment the member list and wait for all the new members to join the group before rebalancing, since by default stream application’s rebalance timeout is infinity. The conclusion is that: it is server’s responsibility to avoid excessive rebalance, and client’s responsibility to make each rebalance more efficient.
Metadata
...
Space vs
...
Rebalance Efficiency
Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we carry over add on too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have been started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.
...
We are going to add a new type of protocol called "streamsstream" for the protocol type.
Code Block | ||||
---|---|---|---|---|
| ||||
ProtocolTypes : {"consumer", "connect", "stream"} |
Also a bunch of new configs for user to better apply this change.We will be adding following new configs:
stream.rebalancing.mode Default: incremental Version 1.0 | The setting to help ensure no downtime upgrade of online application. Options : upgrading, incremental |
scale.down.timeout.ms Default: infinity Version 2.0 | Timeout Time in milliseconds to force terminate the stream worker when informed to be scaled down. |
...
learner.partial.rebalance Default : true Version 3.0 | If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the task replaying. Otherwise, new worker will batch the ready stage call to ask for single round of rebalance. |
...