Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Reduce unnecessary downtime due to task restoration
  • Make rebalance performance better for stream applications, A.K.A alleviating Stop-The-World Effect.

Proposed Changes

...

Terminology

we shall define some terms for easy walkthrough of the algorithm.

...

For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new member, we shall also mark active tasks as "being learned task" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. This way learned tasks could immediately transfer ownership without attempting for a second round of rebalance. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks

Algorithm Walkthrough

As we could see, there should be only exactly one learner task after each round of rebalance, and there should be exactly one corresponding active task at the same time



Next we are going to look at several typical scaling scenarios to better understand the algorithm.

...

Code Block
languagetext
titleScale-up
Cluster has 3 stream workers S1(leader), S2, S3, and they each own some tasks 1 ~ 5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])

#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempt to join the group. S5 is not ready yet.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])

#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members and 5 tasks.

...