Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Worker (stream worker): unit of streaming processor on thread level. We It is equivalent to stream main consumer currently, and we will explain why we want to separate the name from "stream consumer "  out a new definition.
  • Instance (stream instance): the KStream instance serving as container of stream workers. This could suggest a physical host or a kubernetes pod.
  • Leaner task: a special task that gets assigned to one stream instance to restore a current active task state from another instance.

Learner Task

...

Introduction

Learner task shares the same semantics as standby task, and the only difference is that when the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out rebalance of the new task assignment. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task. This applies to both scale up and scale down scenarios.

...

Code Block
languagetext
titleScale-up
Cluster has 3 stream instancesworkers S1(leader), S2, S3, and they each own some tasks 1 ~ 5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])


#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempt to join the group. S5 is not ready yet.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])


#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members and 5 tasks.

...

Scaling up from scratch means all instances workers are new members. There is no need to implement a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.

After deprecating group.initial.rebalance.delay, we still expect the algorithm to work because every task assignment during rebalance will adhere to the rule "if given task is currently active, reassignment must happen only to instances workers who are declared ready to serve this task."

...

Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency with many more rebalances introduced. We could supply user with a config to decide whether they want to take eager approach or stable approach.

Example is like:

A stream instance worker S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the instance worker will call out rebalance immediately when T1 finishes replaying. While under stable rebalance, instance worker will rejoin the group until it finishes replaying of both T1 and T2.

...

Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize instances workers which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly. 

...

Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we carry over too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang has have been started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.

...

learn.partial.rebalance

Default : true

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one task state each time, until it eventually finishes all the task replaying. Otherwise, new instance worker will batch the ready stage to ask for single round of rebalance.

...

scale.down.timeout.ms

Default: infinity

Timeout in milliseconds to force terminate the stream instance worker when informed to be scaled down.

...