Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagesql
Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of Workers,

      Where each worker contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicate "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To resource available instances

	Keep existing learner tasks' assignment unchanged

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances with corresponding standby tasks

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource available instances
		Based on num.standby.task config, this could take multiple rounds

Output Finalized Task Assignment

Task Tagging

Note that to make sure the above resource shuffling could happen as expectedTo enable learner resource shuffling behavior, we need to have the following task status indicators to be provided:

Tag NameTask TypeExplanation
isStatefulbothIndicate whether given task has a state to restore.
isLearnerstandbyIndicate whether standby task is a learner task.
beingLearnedactiveIndicate whether active task is being learned by some other stream worker.
isReadystandbyIndicate whether standby task is ready to serve as active task.

...

Optimizations

Stateful vs Stateless Tasks

...

The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader worker will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to them onceThe reason we are hoping to only implement new instances is because there is a potential edge case that could break the existing naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. Also we care more about smooth transition over resource balance for stage one. We do have some historical discussion on marking weight for different types of tasks. If we go ahead to aim for task balance too early, we are potentially in the position of over-optimization. In conclusion, we want to delay the finalized design for eventual balance until last version.

We also don't want to take the eager rebalance optimization in version 1.0 due to the explained concerns.

Version 2.0

Delivery goal: Scale down support

...

Delivery goal: Eager rebalance analysis

A detailed analysis and benchmark support need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely things we are not in favor of. 

...

The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks (according to relative instance weight), which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave

Example:

A group with 4 instances that have following capacities: 5, 10, 10, 15, and 80 tasks shall expect the perfect balances into:

10, 20, 20, 30 tasks.

In case we set imbalance factor to 20%

then an eventual assignment like

12, 18, 23, 27 should be stable, as all of them are not way-off the expected load assignment.

We also want to balance the load separately for stateful tasks and stateless tasks as discussed above. So far version 4.0 still has many unknowns and is slightly beyond the incremental rebalancing scope. A separate KIP could be initiatedOur plan is to keep iterating on the details or bake a separate KIP in the future.

Public Interfaces

We are going to add a new type of protocol called "stream" for the protocol type. 

Code Block
languagejava
titleProtocol Type
ProtocolTypes : {"consumer", "connect", "stream"}


Also a bunch of adding new configs for user to better apply and customize the scaling change.

...