Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". HoweverNext, we also want to have a will present a holistic view of the new learner assignment algorithm during each actual rebalance.

We shall assign tasks in The assignment will be broken down in the order of: active, learner and standby . The assignment will be broken down into following steps:tasks.

Code Block
languagesql
Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of Workers,

      Where each worker contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicate "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To resource available instances

	Keep existing learner tasks' assignment unchanged

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances with corresponding standby tasks

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource available instances
		Based on num.standby.task config, this could take multiple rounds

Output Finalized Task Assignment

Stream Task Tagging

To enable learner resource shuffling behavior, we need to have the following task status indicators to be provided:

...

Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we add-on too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.

...

For the smooth delivery of all the features we have discussed so far, an iteration plan of reaching final algorithm is defined as below:

Version 1.0
Anchor
algorithm_version_1_0
algorithm_version_1_0

...

The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader worker will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to them new instances onceThe reason we are hoping to only implement new instances is because there is a potential edge case that could break the existing naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. Also we care more about smooth transition over resource balance for stage one. We do have some historical discussion . We plan to better address this issue in version 4.0 where we take eventual load balance into consideration. Some discussions have been initiated on marking weight for different types of tasks . If we go ahead to aim for task balance too early, we are potentially in the position of over-optimization. In for a while. To me, it is unclear so far what kind of eventual balance model we are going to implement. In conclusion, we want to delay postpone the finalized design for eventual balance until last version.

...

A detailed analysis and benchmark support test need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely things we are not in favor of. 

...

Version 4.0 (Stretch)

Delivery goal: Task state labeling, eventual workload balance

The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks (according to relative instance weight), which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave. Example:

Code Block
languagetext
titleEventual balance example
A group with 4 instances that have following capacities: 5, 10, 10, 15, and 80 tasks shall expect the perfect balances into:

...



10, 20, 20, 30 tasks.

...




In case we set imbalance factor to 20%

...


then an eventual assignment like

...


12, 18, 23, 27 should be stable, as all of them are not way-off the expected load assignment.


We also want to balance Some optimizations such as balancing the load separately for stateful tasks and stateless tasks as discussed abovecould also be applied here. So far version 4.0 still has many unknowns and is slightly beyond the incremental rebalancing scope. Our plan is to keep iterating on the details or bake a separate KIP for balancing algorithm in the future.

Public Interfaces

...

learner.partial.rebalance

Default : true

Version 3.0

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the replaying. Otherwise, new worker will batch the ready call to ask for a single round of rebalance.

...

Call out this portion because the algorithm we are gonna design is fairly complicated so far. To make sure the delivery is smooth with fundamental changes of KStream internals, I we build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the changes are highly coupled with internal changes. Without these details, the algorithm is not making sense.

Compatibility, Deprecation, and Migration Plan

...

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer changes  rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

...

Recommend Upgrade Flow

As we have mentioned above, a new protocol type shall be created. To ensure smooth transitionupgrade, we need to make sure the existing job doesn't fail. The workflow for upgrading is like below:

...

In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far this workaround could make the effort smallerprocess much easier.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the google doc.