Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415. There is already exciting discussion around it, but for Kafka Streams, the delayed rebalance is not the complete solution. This KIP is trying to customize the cooperative rebalancing approach specifically for KStream application context,  based on the great design for KConnect.

Currently Kafka Streams uses consumer membership protocol to coordinate the stream task assignment. When we scale up the stream application, KStream group will attempt to revoke active tasks and let the newly spinned up hosts take over them. New hosts need to restore assigned tasks' state before transiting to "running". For state heavy application, it is not ideal to give up the Currently Kafka Streams uses consumer membership protocol to coordinate the stream task assignment. When we scale up the stream application, KStream group will attempt to revoke active tasks and let the newly spinned up hosts take over them. It takes time for the new host to restore the tasks if assigned ones are stateful, but current strategy is to reassign the tasks upon receiving new member join group requests to achieve application consumption balance. For state heavy application, it is not ideal to give up the tasks immediately once the new player joins the party, instead we should buffer some time to let the new player accept some a fair amount of restoring tasks, and wait until it is “ready” to take finish state reconstruction first before officially taking over the active tasks. Ideally, we could realize no downtime transition during cluster scaling up if we take this approach. Same situation applies to scale down, when we need to buffer time for migrating the tasks from ready-to-shut-down hosts to retained ones.

Recently the community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started in KIP-415. There is already great discussion around it, but the hard part for KStream is that delayed rebalance is not the most ideal solution. The better approach is to adopt some great design fo KConnect in KIP-415, while let KStream members explicitly announce the state changes and trigger necessary rebalance to migrate the resource ownership, once they are fully ready after task restoring.


In short, the primary goals of this KIP areThus we are proposing a dedicated design specifically for KStream rebalancing in order to holistically smooth the scale up/down experience. The primary purposes are two:

  • Reduce unnecessary downtime due to task restoration Make rebalance performance better for stream applications, A.K.A alleviating Stop-The-World Effectand global application revocation.
  • Better auto scaling experience for KStream applications, including scale up and scale down.

Proposed Changes


we shall define some several new terms for easy walkthrough of the algorithm.

  • Worker (A.K.A stream worker): unit of streaming processor on thread level. It is equivalent to stream main consumer currently, and later sections will explain why we want to separate out a new definition thread level streaming processor, who actually takes the stream task.
  • Instance (stream A.K.A stream instance): the KStream instance serving as container of stream workers set. This could suggest a physical host or a kubernetes k8s pod.
  • Leaner task: a special task that gets assigned to one stream instance to restore a current active task state from another instance.

Learner Task



Learner task shares the same semantics as standby task, which is only taken care by the restore consumer . The only difference is that when to replicate main task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out rebalance of the new task assignment. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task. This applies to both scale up and scale down scenarios. 

Stop-The-World Effect


As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all members workers would

  1. Join group with all current assigned tasks revoked.

  2. Wait until group assignment finished to resume the workfinish to get assigned tasks.

  3. Replay the assigned tasks state

  4. Once all replay jobs finish, worker transits to running mode.

The reason for doing so revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. So one topic In this way, any topic partition could not be re-assigned before it is revoked.


For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new member, we shall also mark active tasks as "being learned task" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. This way learned Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks. 


For ease of operation, a new tool for scaling down the stream app shall be built. It will have access to the application instances, and ideally could compute the targeting scaled down members while end user just needs to provide a % of scale down. For example, if the current cluster size is 40 and we choose to scale down to 80%, then the script will attempt to inform 8 of 40 hosts to “prepare leaving” the group.

Code Block
titleLeader crash during scaling
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
Scaling down the application, S2 will be leaving.

#First Rebalance 
Member S2 joins the group and claim that it is leaving
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [T4])

#Second Rebalance 
S3 finishes replay first and trigger another rebalance
Member S1~S3 join with following status:(S1 is not ready yet)
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: []) 
	S3(assigned: [T5], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

#Third Rebalance 
S1 finishes replay and trigger rebalance.
Member S1~S3 join with following status: 
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [], revoked: [T3], learning: []) 
	S3(assigned: [T4, T5], revoked: [], learning: [])
S1 performs task assignments:
	S1(assigned: [T1, T2, T3], revoked: [], learning: [])
	S2(assigned: [], revoked: [T3], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

S2 will shutdown itself upon new assignment since there is no assigned task left.


Leader crash could cause a missing of historical assignment information. For the learner assignment, however, each worker maintains its own assignment status, so when the learner task's id has no active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. The essence is that we don't rely on leader information to dodo the assignment.

Code Block
titleScale-down stream applications
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks 1 ~ 5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.


Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency with by introducing many more rebalances introducedWe could supply user with a config to decide whether they want to take eager approach or stable approach eventually, with some follow-up benchmark tools of the rebalance efficiency. 

Example is like:

A stream worker S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the worker will call out rebalance immediately when T1 finishes replaying. While under stable rebalanceconservative approach, worker will rejoin the group until it finishes replaying of both T1 and T2.

Standby Task Utilization


Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize workers which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly. 


The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". However, we also want to define the new learner algorithm for have a holistic view .We have set of workers, where each worker contains:of the new learner algorithm.

We shall assign tasks in the order of: active, learner and standby. The assignment will be broken down into following steps:

Assign active tasks:

  1. Assign all active tasks to learner tasks that indicates "ready"
  2. Assign the rest of active tasks to previous owners
  3. Assign the rest of active tasks towards to unready learner tasks owners
  4. Assign the rest of active tasks to resource available hosts

Assign learner tasks:

  1. Keep assigned current learner tasks until finishedthe same. We don't want to see will not handle half way bounce at least in the first version.
  2. If the load is not balanced between hosts, assign learner tasks from hosts with heavy loads to hosts with lightweight tasks.
  3. As long as the group members/ number of tasks are not changing, there should be a defined balanced stage instead of forever rebalancing.
  4. Instances with standby tasks have higher priority to be chosen as learner task assignor. The standby task will convert to learner task immediately.


  1. Will mostly remain the same as current protocol, in which we will just pick resource abundant hosts to allocate standby tasks. 
  2. For tasks that get transferred after learner tasks finish, we could assign standby tasks right to the degraded hosts which hold previous round of active tasks as an optimization.

Also for the smooth delivery of all the features we have discussed so far, an iteration plan of algorithm is as below:

Version 1.0

We will care more about smooth transition over resource balance for stage one. This is because we do have some historical discussion on marking weight for different types of tasks. If we go ahead to aim for task balance too early, we are potentially in the position of over-optimization.
