Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

ca

Table of Contents

Status

Current state[Under Discussion]

...

Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance of the new task assignmentto do task transfer. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task.

...

Leader crash could cause a missing of historical assignment information. For the learner assignmentlearners already assigned, however, each worker maintains its own assignment status, so when the learner task's id has no corresponding active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. The essence is that we don't rely on leader information to do the assignment. 

Code Block
languagetext
titleScale-down stream applications
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.

...

However, if the leader dies before new instances join, the potential risk is that leader could not differentiate which stream instance is "new", because it relies on the historical information. The . For version 1.0, final assignment is probably not ideal in this case if we only attempt to assign learner task to new comers in version 1.0. This also motivates us to figure out a better task coordination strategy for load balance in long term.

...

Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency by introducing many more rebalances. We could supply user with a config to decide whether they want to take eager approach or stable approach eventually, with some follow-up benchmark tools of the rebalance efficiency. Example:

A stream worker S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the worker will call out rebalance immediately when T1 finishes replaying. While under conservative approach, worker will rejoin the group until it finishes replaying of both T1 and T2.

...

The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". However, we also want to have a holistic view of the new learner assignment algorithm during each actual rebalance.

We shall assign tasks in the order of: active, learner and standby. The assignment will be broken down into following steps:

Code Block
languagesql
Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of Workers,

      Where each worker contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicatesindicate "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To resource available instances

	Keep existing learner tasktasks' assignment unchanged

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances with corresponding standby tasks

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource available instances
		Based on num.standby.task config, this could take multiple rounds

Output Finalized Task Assignment

...

Version 3.0

Delivery goal: eager rebalance Eager rebalance analysis

A detailed analysis and benchmark support need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely things we are not in favor of. 

The version 3.0 is upon version 1.0 success, and could be done concurrently with version 2.0. We may choose to adopt or discard this change, depending on the benchmark result.

Version 4.0 (Stretch)

Delivery goal: task Task labeling, eventual workload balance

The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks, which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave.

We also want to balance the load separately for stateful tasks and stateless tasks as discussed above. So far version 4.0 still has many unknowns and is slightly beyond the incremental rebalancing scope. A separate KIP may could be initiated to discuss this approach in detail.

Trade-offs

More

...

Rebalances

The new algorithm will invoke many more rebalances than the current protocol as one could perceive. As we have discussed in the overall incremental rebalancing design, it is not always bad to have multiple rebalances when we do it wisely, and after KIP-345 we have a future proposal to avoid scale up rebalances for static members. The goal is to pre-register the members that are planning to be added. The broker coordinator will augment the member list and wait for all the new members to join the group before rebalancing, since by default stream application’s rebalance timeout is infinity. The conclusion is that: it is server’s responsibility to avoid excessive rebalance, and client’s responsibility to make each rebalance more efficient.

Metadata

...

Space vs

...

Rebalance Efficiency

Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we carry over add on too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have been started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.

...

We are going to add a new type of protocol called "streamsstream" for the protocol type. 

Code Block
languagejava
titleProtocol Type
ProtocolTypes : {"consumer", "connect", "stream"}


Also a bunch of new configs for user to better apply this change.We will be adding following new configs:

stream.rebalancing.mode

Default: incremental

Version 1.0

The setting to help ensure no downtime upgrade of online application.

Options : upgrading, incremental


scale.down.timeout.ms

Default: infinity

Version 2.0

Timeout Time in milliseconds to force terminate the stream worker when informed to be scaled down.

...

learner.partial.rebalance

Default : true

Version 3.0

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the task replaying. Otherwise, new worker will batch the ready stage call to ask for single round of rebalance.

...