Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will introduce a new type of stream task called `learner task`, which is a special task that gets assigned to one stream instance to restore a current active task state from another instance. It shares the same semantics as standby task, and the only difference is that when the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out rebalance of the new task assignment. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task. This applies to both scale up and scale down scenarios.

Stop-

...

The-

...

World Effect Recap

As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream when rebalance starts, all members would

...

Next we are going to look at several examples of auto scaling cases

Scale Up

typical scaling scenarios to better understand the algorithm.

Normal Scale Up

The newly joined members will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real task transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.

Code Block
languagetext
titleScale-up stream applications
Cluster has 3 stream instances S1(leader), S2, S3, and they each own some tasks 1 ~ 5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])


#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempt to join the group. S5 is not ready yet.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])


#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members and 5 tasks.

Scaling Up from Empty Group

Normal Scale Down

Code Block
languagetext
titleScale-down stream applications
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
Scaling down the application, S2 will be leaving.
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [T4])
S3 finishes replay first and trigger another rebalance
S1 performs task assignments:
S1(assigned: [T1, T2], revoked: [], learning: [T3])
S2(assigned: [T3], revoked: [T4], learning: [])
S3(assigned: [T4, T5], revoked: [], learning: [])
S1 finishes replay and trigger rebalance.
S1(assigned: [T1, T2, T3], revoked: [], learning: [])
S2(assigned: [], revoked: [T3], learning: [])
S3(assigned: [T4, T5], revoked: [], learning: [])
Now S2 will shutdown itself since there is no assigned task left.



Note that to make sure the above resource shuffling could happen as expected, we need to have at least three new task status indicator to be provided:

...