Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state[Under Discussion]

Discussion thread: TBD

JIRA:  TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently Kafka Streams uses consumer membership protocol to coordinate the stream task assignment. When we scale up the stream application, KStream group will attempt to revoke active tasks and let the newly spinned up hosts take over them. It takes time for the new host to restore the tasks if assigned ones are stateful, but current strategy is to reassign the tasks upon receiving new member join group requests to achieve application consumption balance. For state heavy application, it is not ideal to give up the tasks immediately once the new player joins the party, instead we should buffer some time to let the new player accept some restoring tasks, and wait until it is “ready” to take over the active tasks. Ideally, we could realize no downtime transition during cluster scaling up if we take this approach. Same situation applies to scale down, when we need to buffer time for migrating the tasks from ready-to-shut-down hosts to retained ones.

Recently the community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started in KIP-415. There is already great discussion around it, but the hard part for KStream is that delayed rebalance is not the most ideal solution. The better approach is to adopt some great design fo KConnect in KIP-415, while let KStream members explicitly announce the state changes and trigger necessary rebalance to migrate the resource ownership, once they are fully ready after task restoring.

Thus we are proposing a dedicated design specifically for KStream rebalancing in order to holistically smooth the scale up/down experience. The primary purposes are two:

  • Reduce unnecessary downtime due to task restoration
  • Make rebalance performance better for stream applications, A.K.A alleviating Stop-The-World Effect.
  • Better auto scaling experience for KStream applications, including scale up and scale down.

Proposed Changes

Terminology

we shall define some terms for easy walkthrough of the algorithm.

  • Worker (stream worker): unit of streaming processor on thread level. It is equivalent to stream main consumer currently, and later sections will explain why we want to separate out a new definition.
  • Instance (stream instance): the KStream instance serving as container of stream workers. This could suggest a physical host or a kubernetes pod.
  • Leaner task: a special task that gets assigned to one stream instance to restore a current active task state from another instance.

Learner Task Introduction

Learner task shares the same semantics as standby task, which is only taken care by the restore consumer. The only difference is that when the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out rebalance of the new task assignment. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task. This applies to both scale up and scale down scenarios.

Stop-The-World Effect in KStream

As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all members would

  1. Join group with all current assigned tasks revoked.

  2. Wait until group assignment finished to resume the work.

The reason for doing so is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. So one topic partition could not be re-assigned before it is revoked.

For Kafka Connect, we choose to keep all current assigned tasks running and trade off with one more rebalance. The behavior becomes:

  1. Join group with all current active tasks running.

  2. Sync the revoked partitions and stop them (first rebalance).

  3. Rejoin group immediately with only active tasks (second rebalance).

Feel free to take a look at KIP-415 example to get a sense of how the algorithm works.

For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new member, we shall also mark active tasks as "being learned task" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. This way learned tasks could immediately transfer ownership without attempting for a second round of rebalance. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks. 


Next we are going to look at several typical scaling scenarios to better understand the algorithm.

Scaling Scenarios

Scale Up Running Application

The newly joined workers will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real task transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.

Code Block
languagetext
titleScale-up
Cluster has 3 stream workers S1(leader), S2, S3, and they each own some tasks 1 ~ 5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])

#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempt to join the group. S5 is not ready yet.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])

#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members and 5 tasks.


Scale Up from Empty Group

Scaling up from scratch means all workers are new members. There is no need to implement a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.

After deprecating group.initial.rebalance.delay, we still expect the algorithm to work because every task assignment during rebalance will adhere to the rule "if given task is currently active, reassignment must happen only to workers who are declared ready to serve this task."

Code Block
languagetext
titleScale-up from ground
Group empty state: unassigned tasks [T1, T2, T3, T4, T5]

#First Rebalance 
New member S1 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) // T1~5 not previously owned

#Second Rebalance 
New member S2, S3 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) 
S2(assigned: [], revoked: [], learning: [T3, T4])
S3(assigned: [], revoked: [], learning: [T5])

#Third Rebalance 
S2 and S3 are ready immediately after the assignment.
Member S1~S3 join with following status:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [], revoked: [], learning: [T3, T4])
	S3(assigned: [], revoked: [], learning: [T5])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])

Scale Down Running Application

As we have already discussed around the “learner” logic, when we perform the scale down of stream group, it is also favorable to initiate learner tasks before actually shutting down the instances. Although standby tasks could help in this case, it requires user to pre-set which may not be available when admin performs scaling down. The plan is to use command line tool to tell certain stream members that a shutdown is on the way to be executed. These informed members will send join group request with join reason indicating they are “leaving soon”. During rebalance assignment, leader will perform the learner assignment among members without intention of leaving. And the leaving member will shut down itself once received the instruction to revoke all its active tasks.

For ease of operation, a new tool for scaling down the stream app shall be built. It will have access to the application instances, and compute the scaled down members while end user just needs to provide a % of scale down. For example, if the current cluster size is 40 and we choose to scale down to 80%, then the script will attempt to inform 8 of 40 hosts to “prepare leaving” the group.

Code Block
languagetext
titleScale-down stream applications
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
Scaling down the application, S2 will be leaving.

#First Rebalance 
Member S2 joins the group and claim that it is leaving
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [T4])

#Second Rebalance 
S3 finishes replay first and trigger another rebalance
Member S1~S3 join with following status:(S1 is not ready yet)
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: []) 
	S3(assigned: [T5], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

#Third Rebalance 
S1 finishes replay and trigger rebalance.
Member S1~S3 join with following status: 
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [], revoked: [T3], learning: []) 
	S3(assigned: [T4, T5], revoked: [], learning: [])
S1 performs task assignments:
	S1(assigned: [T1, T2, T3], revoked: [], learning: [])
	S2(assigned: [], revoked: [T3], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

S2 will shutdown itself upon new assignment since there is no assigned task left.

Leader Transfer During Scaling 

Leader crash could cause a missing of historical assignment information. For the learner assignment, however, each worker maintains its own assignment status, so when the learner task's id has no active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. The essence is that we don't rely on leader information to do

the assignment.

Code Block
languagetext
titleScale-down stream applications
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks 1 ~ 5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.

Optimizations

Stateful vs Stateless Tasks

For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task: "isStateful".

Eager Rebalance 

Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency with many more rebalances introduced. We could supply user with a config to decide whether they want to take eager approach or stable approach.

Example is like:

A stream worker S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the worker will call out rebalance immediately when T1 finishes replaying. While under stable rebalance, worker will rejoin the group until it finishes replaying of both T1 and T2.

Standby Task Utilization During Scale Down

Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize workers which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly. 

Scale Down Timeout

Sometimes end user wants to reach a sweet spot between ongoing task transfer and streaming resource free-up. So we want to take a similar approach as KIP-415, where we shall introduce a client config to make sure the scale down is time-bounded. If the time takes to migrate tasks outperforms this config, the leaving member will shut down itself immediately instead of waiting for the final confirmation. And we could simply transfer learner tasks to active because they are now the best shot to own new tasks.

Tagging Info Summary

Note that to make sure the above resource shuffling could happen as expected, we need to have at least three new task status indicator to be provided:

  1. Which task is learner task? This could be a tag on standby task as "isLearner";
  2. Which task is being learned? This could be a tag on active task as "isLearned";
  3. Which learner task has become ready? This could be a tag on standby task as "isReady".
  4. Which task is stateful? 

Algorithm Walkthrough

The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing.  We also want to define the new learner algorithm for a holistic view.

We have set of workers, where each worker contains:

Workers = []

Worker {

  Set<ActiveTask>;

  Set<StandbyTask>; 

  instanceId;

}

The assignment will be broken down into following rounds:

Assign active tasks:

  1. Assign all active tasks to learner tasks that indicates "ready"
  2. Assign the rest of active tasks to previous owners
  3. Assign the rest of active tasks towards unready learner tasks owners
  4. Assign the rest of active tasks to resource available hosts

Assign learner tasks:

  1. Keep the existing learner tasks running. We don't want to see half way bounce.
  2. If the load is not balanced between hosts, assign learner tasks from hosts with heavy loads to hosts with lightweight tasks.
  3. As long as the group members/ number of tasks are not changing, there should be a defined balanced stage instead of forever rebalancing.
  4. Instances with standby tasks have higher priority to be chosen as learner task assignor. The standby task will convert to learner task immediately.

Assign standby tasks:


For each instance, the total number of tasks it owns should be within the range of 0.5 ~ 2 times of the expected number of tasks, which buffers some capacity in order to avoid imbalance.

We could even provide a stream.balancing.factor for the user to configure. The smaller this number sets to, the more strict the assignment will behave.  If the factor is set to r, the number of tasks a host could own is (w/total tasks)

As we could see, there should be only exactly one learner task after each round of rebalance, and there should be exactly one corresponding active task at the same time. 


Algorithm Trade-offs

We open a special section to discuss the trade-offs of the new algorithm, because it's important to understand the change motivation and make the proposal more robust. 

More rebalances

The new algorithm will invoke many more rebalances than the current protocol as one could perceive. As we have discussed in the overall incremental rebalancing design, it is not always bad to have multiple rebalances when we do it wisely, and after KIP-345 we have a future proposal to avoid scale up rebalances for static members. The goal is to pre-register the members that are planning to be added. The broker coordinator will augment the member list and wait for all the new members to join the group before rebalancing, since by default stream application’s rebalance timeout is infinity. The conclusion is that: it is server’s responsibility to avoid excessive rebalance, and client’s responsibility to make each rebalance more efficient.

Metadata size increase

Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we carry over too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have been started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.

Public Interfaces

We will be adding following new configs:

learn.partial.rebalance

Default : true

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one task state each time, until it eventually finishes all the task replaying. Otherwise, new worker will batch the ready stage to ask for single round of rebalance.


scale.down.timeout.ms

Default: infinity

Timeout in milliseconds to force terminate the stream worker when informed to be scaled down.


stream.balancing.factor

Default: 2

The tolerance of task imbalance factor between hosts to trigger rebalance.


to help user define their customized strategy.


Implementation Plan

We want to callout this portion because the algorithm we are gonna design is fairly complicated so far. To make sure the delivery is smooth with fundamental changes of KStream internals, I build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the changes are highly coupled with internal changes. Without these details, the algorithm is not making sense.

Compatibility, Deprecation, and Migration Plan

  • Metadata size increase
  • No downtime upgrade due to change of protocolType
  • Stream instance replacement
    • The right approach for a global application instance replacement is
      • Increase the capacity of the current stream job to 2
      • Mark existing stream instances as leaving
      • Learner tasks finished on new hosts, shutting down old ones.

FAQ

Why do we call stream workers?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.