Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will introduce a new type of stream task called `learner task`, which is a special task that gets assigned to one stream instance to restore a current active task state from another instance. It shares the same semantics as standby task, and the only difference is that when the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out rebalance of the new task assignment. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task. This applies to both scale up and scale down scenarios.

Stop-The-World Effect

...

in KStream

As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream when rebalance starts, all members would

...

Scaling up from scratch means all instances are new members. There is no need to implement a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the previousActiveTask groupactive task bucket for other members, if not we just transfer the ownership immediately.

After deprecating group.initial.rebalance.delay, we still expect the algorithm to work because multiple rebalances will trigger to share the workload immediately for the same reason as no restoration effort is needed.every task assignment during rebalance will adhere to the rule "if given task is currently active, reassignment must happen only to instances who are declared ready to serve this task."

Code Block
languagetext
titleScale-up from ground
Group empty state: unassigned tasks [T1, T2, T3, T4, T5]

#First Rebalance 
New member S1 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) // T1~5 not previously owned


#Second Rebalance 
New member S2, S3 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) 
S2(assigned: [], revoked: [], learning: [T3, T4])
S3(assigned: [], revoked: [], learning: [T5])


#Third Rebalance 
S2 and S3 are ready immediately after the assignment.
Member S1~S3 join with following status:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [], revoked: [], learning: [T3, T4])
	S3(assigned: [], revoked: [], learning: [T5])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [T3, T4], revoked: [], learning: [T3, T4])
	S3(assigned: [T5], revoked: [], learning: [])

...

For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task: "isStateful".

Eager Rebalance 

Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency with many more rebalances introduced.

We could supply user a config to decide whether they want to take eager approach or stable approach.

Example:

A stream instance S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the instance will call out rebalance immediately when T1 finishes replaying. While under stable rebalance, instance will rejoin the group until it finishes replaying of both T1 and T2.



Boolean config: learner.partial.ready



Public Interfaces

We will be adding following new configs:

learn.partial.rebalance

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one task state each time, until it eventually finishes all the task replaying. Otherwise, new instance will batch the ready stage to ask for single round of rebalance.


A public interface is any change to the following:

...