Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

e'v

Table of Contents

Status

Current state[Under Discussion]

...

Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do the task transfer. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying finished replaying the active task.

Stop-The-World Effect

As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all workers would

...

Scaling up from scratch means all workers are new members. There is no need to implement start a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.

...

Scale Down Running Application

As we have already discussed around the “learner” logic, when performing When performing the scale down of stream group, it is also favorable to initiate learner tasks before actually shutting down the instances. Although standby tasks could help in this case, it requires user to pre-set num.standby.tasks which may not be available when administrator performs scaling down. Besides the standby tasks are not guaranteed up-to-date. The plan is to use command line tool to tell certain stream members that a shutdown is on the way to be executed. These informed members will send join group request to indicate that they are “leaving soon”. During assignment phase, leader will perform the learner assignment among members who are not leaving. And the leaving member will shut down itself once received the instruction to revoke all its active tasks.

...

Code Block
languagetext
titleLeader crash before
Cluster has 3 stream workers S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]

#First Rebalance 
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader, while T1 is not assigned now
S2 ~ S4 join with following status
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments:
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [T1], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [])

Now the group reaches balance, although the eventual load is skewed.

...

Code Block
languagesql
Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of Workers,

      Where each worker contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicate "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To resource available instances

	Keep existing learner tasks' assignment unchanged

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances with corresponding standby tasks

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource available instances
		Based on num.standby.task config, standby task thisassignment could take multiple rounds

Output Finalized Task Assignment

...

Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize workers which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly. 

Scale Down Timeout

Sometimes end user User naturally wants to reach a sweet spot between ongoing task transfer and streaming resource free-up. So we want to take a similar approach as KIP-415, where we shall introduce a client config to make sure the scale down is time-bounded. If the time takes to migrate tasks outperforms this config, the leaving member will shut down itself immediately instead of waiting for the final confirmation. And we could simply transfer learner tasks to active because they are now the best shot to own new tasksleader will send out join group request and force removing active tasks on the leaving members and transfer those tasks to other staying members, so that leaving members will shut down themselves immediately after this round of rebalance.

Trade-offs

More Rebalances

...

Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we add-on too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.

...

Iteration Plan

For the smooth delivery of all the features discussed so far, an the iteration plan is defined as belowis broken down into four stages:

Version 1.0
Anchor
algorithm_version_1_0
algorithm_version_1_0

...

The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader worker will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to new instances onceThe reason we are hoping to only implement for only implementing new instances logic is because there is a potential edge case that could break the existing current naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. We plan to better address this issue in version 4.0 where we take eventual load balance into consideration. Some discussions have been initiated on marking weight for different types of tasks for a while. To me, it is unclear so far what kind of eventual balance model we are going to implement. In conclusion, we want to postpone the finalized design for eventual balance until last version.

...

A detailed analysis and benchmark test need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely things we are not in favor ofunfavorable

The version 3.0 is upon version 1.0 success, and could be done concurrently with version 2.0. We may choose to adopt or discard this change, depending on the benchmark result.

...

Code Block
languagetext
titleEventual balance example
A group with 4 instances that have following capacities: 5, 10, 10, 15, and 80 tasks shall expect the perfect balances into:

10(5/40), 20(10/40), 20(10/40), 30(15/40) tasks.


In case we set imbalance factor to 20%
then an eventual assignment like
12, 18, 23, 27 should be stable, as all of them are not way-off the expected load assignment.

...

Code Block
languagejava
titleProtocol Type
ProtocolTypes : {"consumer", "connect", "stream"}


Also adding new configs for user to better apply and customize the scaling change.

...

stream.imbalance.percentage

Default: 0.2 (20%)

Version 4.0

The tolerance of task imbalance factor between hosts to trigger rebalance.

Implementation Plan

Call out this portion because the algorithm we are gonna design is fairly complicated. To make sure the delivery is smooth with fundamental changes of KStream internals, we build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the changes algorithm requirements are highly coupled with internal changesarchitecture reasoning.

Compatibility, Deprecation, and Migration Plan

...

In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far this workaround could make the process , user could choose to take this much easier workaround.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the google doc.