Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Terminology

we shall define several new terms for easy walkthrough of the algorithm.

...

.

  • Instance (A.K.A stream instance): the KStream instance serving as container of stream worker threads set. This could suggest a physical host or a k8s pod. We will interleave the definition of worker and instance for the most part of discussion concerning "working member" because the The stream thread's capacity is essentially controlled by the instance relative size, not the worker.
  • Learner task: a special standby task that gets assigned to one stream instance to restore a current active task and transits to active when the restoration is complete.

...

As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all workers wouldstream threads would

  1. Join group with all currently assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks and resume working.

  3. Replay the assigned tasks state.

  4. Once all replay jobs finish, worker stream thread transits to running mode.

...

Scale Up Running Application

The newly joined workers stream threads will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real ownership transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.

Code Block
languagetext
titleScale-up
Cluster has 3 stream workersthreads S1(leader), S2, S3, and they each own some tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group.
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])

#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempting to join the group.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])

#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members each owning one task.

...

Scaling up from scratch means all workers stream threads are new members. There is no need to start a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.

After deprecating group.initial.rebalance.delay, we still expect the algorithm to work because every task assignment during rebalance will adhere to the rule "if given task is currently active, reassignment must happen only to workers stream threads who are declared ready to serve this task."

...

Since the incremental rebalancing requires certain historical information of last round assignment, the leader worker stream thread will need to maintain the knowledge of:

...

Leader crash could cause a missing of historical assignment information. For the learners already assigned, however, each worker stream thread maintains its own assignment status, so when the learner task's id has no corresponding active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. 

Code Block
languagetext
titleScale-down stream applications
Cluster has 3 stream workersstream threads S1(leader), S2, S3, and they own tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.

...

Code Block
languagetext
titleLeader crash before
Cluster has 3 stream workersthreads S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]

#First Rebalance 
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader, while T1 is not assigned now
S2 ~ S4 join with following status
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments: (no learner assignment since S2 doesn't know S4 is new member)
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [T1], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [])

Now the group reaches balance, although the eventual load is skewed.

...

Code Block
languagesql
Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of WorkersStream Threads,

      Where each stream workerthread contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicate "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To instances who are not marked "leaving"	
		To resource available instances

	Keep existing learner tasks' assignment unchanged

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances who are not marked "leaving"	
		To instances with corresponding standby tasks
	Prerequisite is that the instance version supports learner mechanism. 

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource abundant instances
		To instances who are not marked "leaving"
	Based on num.standby.task config, standby task assignment could take multiple rounds

Output Finalized Task Assignment

...

Tag NameTask TypeExplanation
isStatefulbothIndicate whether given task has a state to restore.
isLearnerstandbyIndicate whether standby task is a learner task.
beingLearnedactiveIndicate whether active task is being learned by some other stream workerthread.
isReadystandbyIndicate whether standby task is ready to serve as active task.

...

Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker thread could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency by introducing many more rebalances. We could supply user with a config to decide whether they want to take eager approach or stable approach eventually, with some follow-up benchmark tools of the rebalance efficiency. Example:

A stream worker thread S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the worker stream thread will call out rebalance immediately when T1 finishes replaying. While under conservative approach, worker stream thread will rejoin the group until it finishes replaying both T1 and T2.

...

Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize workers stream threads which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly. 

...

The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader worker stream thread will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to new instances onceThe reason for only implementing new instances logic is because there is a potential edge case that could break current naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. We plan to better address this issue in version 4.0 where we take eventual load balance into consideration. Some discussions have been initiated on marking task weight for a while. To me, it is unclear so far what kind of eventual balance model we are going to implement at current stage. In conclusion, we want to postpone the finalized design for eventual balance until last version.

...

scale.down.timeout.ms

Default: infinity

Version 2.0

Time in milliseconds to force terminate the stream worker thread when informed to be scaled down.

...

learner.partial.rebalance

Default : true

Version 3.0

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the replaying. Otherwise, new worker stream thread will batch the ready call to ask for a single round of rebalance.

...