Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Reduce unnecessary downtime due to task restoration
  • Make rebalance performance better for stream applications, A.K.A alleviating Stop-The-World Effect.
  • Better auto scaling experience for KStream applications, including
  • Scale up capacity
  • scale up and scale down.Scale down capacity

Proposed Changes

Terminology

...

Next we are going to look at several typical scaling scenarios to better understand the algorithm.

Scaling Scenarios

Scale Up Running Application

The newly joined workers will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real task transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.

Code Block
languagetext
titleScale-up
Cluster has 3 stream workers S1(leader), S2, S3, and they each own some tasks 1 ~ 5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])

#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempt to join the group. S5 is not ready yet.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])

#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members and 5 tasks.


Scale Up from Empty Group

Scaling up from scratch means all workers are new members. There is no need to implement a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.

...

Code Block
languagetext
titleScale-up from ground
Group empty state: unassigned tasks [T1, T2, T3, T4, T5]

#First Rebalance 
New member S1 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) // T1~5 not previously owned

#Second Rebalance 
New member S2, S3 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) 
S2(assigned: [], revoked: [], learning: [T3, T4])
S3(assigned: [], revoked: [], learning: [T5])

#Third Rebalance 
S2 and S3 are ready immediately after the assignment.
Member S1~S3 join with following status:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [], revoked: [], learning: [T3, T4])
	S3(assigned: [], revoked: [], learning: [T5])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])

Scale Down Running Application

As we have already discussed around the “learner” logic, when we perform the scale down of stream group, it is also favorable to initiate learner tasks before actually shutting down the instances. Although standby tasks could help in this case, it requires user to pre-set which may not be available when admin performs scaling down. The plan is to use command line tool to tell certain stream members that a shutdown is on the way to be executed. These informed members will send join group request with join reason indicating they are “leaving soon”. During rebalance assignment, leader will perform the learner assignment among members without intention of leaving. And the leaving member will shut down itself once received the instruction to revoke all its active tasks.

...

Code Block
languagetext
titleScale-down stream applications
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
Scaling down the application, S2 will be leaving.

#First Rebalance 
Member S2 joins the group and claim that it is leaving
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [T4])

#Second Rebalance 
S3 finishes replay first and trigger another rebalance
Member S1~S3 join with following status:(S1 is not ready yet)
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: []) 
	S3(assigned: [T5], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

#Third Rebalance 
S1 finishes replay and trigger rebalance.
Member S1~S3 join with following status: 
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [], revoked: [T3], learning: []) 
	S3(assigned: [T4, T5], revoked: [], learning: [])
S1 performs task assignments:
	S1(assigned: [T1, T2, T3], revoked: [], learning: [])
	S2(assigned: [], revoked: [T3], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

S2 will shutdown itself upon new assignment since there is no assigned task left.

Leader Transfer During Scaling 

Leader crash could cause a missing of historical assignment information. For the learner assignment, however, each worker maintains its own assignment status, so when the learner task's id has no active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. The essence is that we don't rely on leader information to do

...

Code Block
languagetext
titleScale-down stream applications
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks 1 ~ 5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.

Summary

Note that to make sure the above resource shuffling could happen as expected, we need to have at least three new task status indicator to be provided:

  1. Which task is learner task? This could be a tag on standby task as "isLearner";
  2. Which task is being learned? This could be a tag on active task as "isLearned";
  3. Which learner task has become ready? This could be a tag on standby task as "isReady".

Optimizations

Stateful vs Stateless Tasks

For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task: "isStateful".

Eager Rebalance 

Optimizations

Stateful vs Stateless Tasks

For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task: "isStateful".

Eager Rebalance 

Sometimes the Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency with many more rebalances introduced. We could supply user with a config to decide whether they want to take eager approach or stable approach.

...

Sometimes end user wants to reach a sweet spot between ongoing task transfer and streaming resource free-up. So we want to take a similar approach as KIP-415, where we shall introduce a client config to make sure the scale down is time-bounded. If the time takes to migrate tasks outperforms this config, the leaving member will shut down itself immediately instead of waiting for the final confirmation. And we could simply transfer learner tasks to active because they are now the best shot to own new tasks.instead of waiting for the final confirmation. And we could simply transfer learner tasks to active because they are now the best shot to own new tasks.

Tagging Info Summary

Note that to make sure the above resource shuffling could happen as expected, we need to have at least three new task status indicator to be provided:

  1. Which task is learner task? This could be a tag on standby task as "isLearner";
  2. Which task is being learned? This could be a tag on active task as "isLearned";
  3. Which learner task has become ready? This could be a tag on standby task as "isReady".
  4. Which task is stateful? 

Algorithm Walkthrough

The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing.  We also want to define the new learner algorithm for a holistic view.

...

  1. Keep the existing learner tasks running. We don't want to see half way bounce.
  2. If the load is not balanced between hosts, assign learner tasks from hosts with heavy loads to hosts with lightweight tasks.
  3. As long as the group members/ number of tasks are not changing, there should be a defined balanced stage instead of forever rebalancing.
  4. Instances with standby tasks have higher priority to be chosen as learner task assignor. The standby task will convert to learner task immediately.

Assign standby tasks:


For each instance, the total number of tasks it owns should be within the range of 0.5 ~ 2 times of the expected number of tasks, which buffers some capacity in order to avoid imbalance.

...