Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Worker (A.K.A stream worker):  thread level streaming processor, who actually takes the stream task.
  • Instance (A.K.A stream instance): the KStream instance serving as container of stream workers set. This could suggest a physical host or a k8s pod. We will interleave the definition of worker and instance for the most part of discussion concerning "working member" because the capacity is essentially controlled by the instance relative size, not the worker.
  • Learner task: a special standby task that gets assigned to one stream instance to restore a current active task state from another instance.

...

Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate main active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out rebalance of the new task assignment. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task. 

Stop-The-World Effect

As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all workers would

...

Code Block
languagetext
titleScale-down stream applications
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.

Optimizations

Stateful vs Stateless Tasks

Leader Transfer Before Scaling 

However, if the leader dies before new instances join, the potential risk is that leader could not differentiate which stream instance is "new", because it relies on the historical information. The final assignment is probably not ideal in this case if we only attempt to assign learner task to new comers in version 1.0. This also motivates us to figure out a better task coordination strategy for load balance in long term.

Code Block
languagetext
titleLeader crash before
Cluster has 3 stream workers S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]

#First Rebalance 
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader
S2 ~ S4 join with following status
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments:
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [T1], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [])

Now the group reaches balance, although the eventual load is skewed.


Optimizations

Stateful vs Stateless Tasks

For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task, so that when we eventually consider the load balance of the stream applications, this could help us separate out tasks into two buckets and rebalance independently.

...

Code Block
languagesql
Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of Workers,

      Where each worker contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicates "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To resource available instances

	Keep existing learner task assignment

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances with corresponding standby tasks

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource available instances
		Based on num.standby.task config, this could take multiple rounds

Output Finalized Task Assignment


Also for For the smooth delivery of all the features we have discussed so far, an iteration plan of reaching final algorithm is defined as below:

Version 1.0
Anchor
algorithm_version_1_0
algorithm_version_1_0

Delivery goal: Scale up support, conservative rebalance

...

stream.imbalance.percentage

Default: 0.2

Version 4.0

The tolerance of task imbalance factor between hosts to trigger rebalance.

Implementation Plan

We want to callout Call out this portion because the algorithm we are gonna design is fairly complicated so far. To make sure the delivery is smooth with fundamental changes of KStream internals, I build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the changes are highly coupled with internal changes. Without these details, the algorithm is not making sense.

...