...
Next we are going to look at several typical scaling scenarios to better understand the algorithm.
...
Scale Up Running Application
The newly joined members will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real task transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.
Code Block | ||||
---|---|---|---|---|
| ||||
Cluster has 3 stream instances S1(leader), S2, S3, and they each own some tasks 1 ~ 5 Group stable state: S1[T1, T2], S2[T3, T4], S3[T5] #First Rebalance New member S4 joins the group S1 performs task assignments: S1(assigned: [T1, T2], revoked: [], learning: []) S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) #Second Rebalance New member S5 joins the group. Member S1~S5 join with following metadata: (S4 is not ready yet) S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned" S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) S5(assigned: [], revoked: [], learning: [T3]) S1 performs task assignments: S1(assigned: [T1, T2], revoked: [], learning: []) S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) S5(assigned: [], revoked: [], learning: [T3]) #Third Rebalance Member S4 finishes its replay and becomes ready, re-attempt to join the group. S5 is not ready yet. Member S1~S5 join with following status:(S5 is not ready yet) S1(assigned: [T2], revoked: [T1], learning: []) S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned" S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [], revoked: [], learning: [T1]) S5(assigned: [], revoked: [], learning: [T3]) S1 performs task assignments: S1(assigned: [T2], revoked: [T1], learning: []) S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [T1], revoked: [], learning: []) S5(assigned: [], revoked: [], learning: [T3]) #Fourth Rebalance Member S5 is ready, re-attempt to join the group. Member S1~S5 join with following status:(S5 is not ready yet) S1(assigned: [T2], revoked: [], learning: []) S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned" S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [T1], revoked: [], learning: []) S5(assigned: [], revoked: [], learning: [T3]) S1 performs task assignments: S1(assigned: [T2], revoked: [], learning: []) S2(assigned: [T4], revoked: [T3], learning: []) S3(assigned: [T5], revoked: [], learning: []) S4(assigned: [T1], revoked: [], learning: []) S5(assigned: [T3], revoked: [], learning: []) Now the group reaches balance with 5 members and 5 tasks. |
Scaling Up from Empty Group
Normal Scale Down
As we have already discussed around the “learner” logic, when we perform the scale down of stream group, it is also favorable to initiate learner tasks before actually shutting down the instances. Although standby tasks could help in this case, it requires user to pre-set which may not be available when admin performs scaling down. The plan is to use command line tool to tell certain stream members that a shutdown is on the way to be executed. These informed members will send join group request with join reason indicating they are “leaving soon”. During rebalance assignment, leader will perform the learner assignment among members without intention of leaving. And the leaving member will shut down itself once received the instruction to revoke all its active tasks.
For ease of operation, a new tool for scaling down the stream app shall be built. It will have access to the application instances, and compute the scaled down members while end user just needs to provide a % of scale down. For example, if the current cluster size is 40 and we choose to scale down to 80%, then the script will attempt to inform 8 of 40 hosts to “prepare leaving” the group.
Code Block | ||||
---|---|---|---|---|
| ||||
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5] Scaling down the application, S2 will be leaving. #First Rebalance Member S2 joins the group and claim that it is leaving S1 performs task assignments: S1(assigned: [T1, T2], revoked: [], learning: [T3]) S2(assigned: [T3, T4], revoked: [], learning: []) S3(assigned: [T5], revoked: [], learning: [T4]) #Second Rebalance S3 finishes replay first and trigger another rebalance Member S1~S3 join with following status:(S1 is not ready yet) S1(assigned: [T1, T2], revoked: [], learning: [T3]) S2(assigned: [T3], revoked: [T4], learning: []) S3(assigned: [T5], revoked: [], learning: [T4]) S1 performs task assignments: S1(assigned: [T1, T2], revoked: [], learning: [T3]) S2(assigned: [T3], revoked: [T4], learning: []) S3(assigned: [T4, T5], revoked: [], learning: []) #Third Rebalance S1 finishes replay and trigger rebalance. Member S1~S3 join with following status: S1(assigned: [T1, T2], revoked: [], learning: [T3]) S2(assigned: [], revoked: [T3], learning: []) S3(assigned: [T4, T5], revoked: [], learning: []) S1 performs task assignments: S1(assigned: [T1, T2, T3], revoked: [], learning: []) S2(assigned: [], revoked: [T3], learning: []) S3(assigned: [T4, T5], revoked: [], learning: []) Now S2 will shutdown itself upon new assignment since there is no assigned task left. |
...