...
- Reduce unnecessary downtime due to task restoration and global application revocation.
- Better auto scaling experience for KStream applications.
- Stretch goal: better workload balance across KStream instances.
Background
Today Streams embed a full fledged Consumer
client, which hard-code a ConsumerCoordinator
inside. Streams then injects a StreamsPartitionAssignor
to its plugable PartitionAssignor
interface and inside the StreamsPartitionAssignor
we also have a TaskAssignor
interface whose default implementation is StickyPartitionAssignor
. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:
Code Block |
---|
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor. |
StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:
Code Block |
---|
[{member1: {List<Topic> topics1, subscription1}}, {member2: {List<Topic> topics2, subscription2}}, ........ ] |
The subscription metadata bytes (version 4) are:
Code Block |
---|
SubscriptionInfo = {Int versionId, Int latestSupportVersionId, clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, HostInfo endPoint} |
And on assignment:
Code Block |
---|
[{member1: {activePartitions1, assignmentInfo1}}, {member2: {activePartitions2, assignmentInfo2}}, ........ ] |
The assignment metadata bytes (version 4) are:
Code Block |
---|
AssignmentInfo = {Int versionId, Int latestSupportedVersionId, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks, Map<HostInfo, Set<TopicPartition>> partitionsByHost, Int errorCode} |
Proposed Changes
Terminology
we shall define several terms for easy walkthrough of the algorithm.
- Instance (A.K.A stream instance): the KStream instance serving as container of stream threads set. This could suggest a physical host or a k8s pod. The stream thread's capacity is essentially controlled by the instance relative size.
- Learner task: a special standby task that gets assigned to one stream instance to restore a current active task and transits to active when the restoration is complete.
Learner Task Essential
Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do the task transfer. The goal of learner task is to delay the task migration when the destination host has not finished replaying the active task.
Stop-The-World Effect
As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would
Join group with all currently assigned tasks revoked.
Wait until group assignment finish to get assigned tasks and resume working.
Replay the assigned tasks state.
Once all replay jobs finish, stream thread transits to running mode.
The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.
For Kafka Connect, we choose to keep all current assigned tasks running and trade off with one more rebalance. The behavior becomes:
Join group with all current active tasks running.
After first rebalance, sync the revoked partitions and stop them.
Rejoin group immediately with only active tasks to trigger a second rebalance.
Feel free to take a look at KIP-415 example to get a sense of how the algorithm works.
Consumer Rebalance Protocol: Stop-The-World Effect
As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would
Join group with all currently assigned tasks revoked.
Wait until group assignment finish to get assigned tasks and resume working.
Replay the assigned tasks state.
Once all replay jobs finish, stream thread transits to running mode.
The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.
Streams Rebalance Metadata: Remember the PrevTasks
Today Streams embed a full fledged Consumer
client, which hard-code a ConsumerCoordinator
inside. Streams then injects a StreamsPartitionAssignor
to its plugable PartitionAssignor
interface and inside the StreamsPartitionAssignor
we also have a TaskAssignor
interface whose default implementation is StickyPartitionAssignor
. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:
Code Block |
---|
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor. |
StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:
Code Block |
---|
Subscription => TopicList SubscriptionInfo
TopicList => List<String>
SubscriptionInfo => Bytes
SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
VersionId => Int32
LatestSupportVersionId => Int32
ClientUUID => 128bit
PrevTasks => Set<TaskId>
StandbyTasks => Set<TaskId>
EndPoint => HostInfo |
And on assignment:
Code Block |
---|
Assignment = AssignedPartitions AssignmentInfo
AssignedPartitions => List<TopicPartition>
AssignmentInfo => Bytes
AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
VersionId => Int32
LatestSupportVersionId => Int32
ActiveTasks => List<TaskId>
StandbyTasks => Map<TaskId, Set<TopicPartition>>
PartitionsByHost => Map<HostInfo, Set<TopicPartition>>
ErrorCode => Int32 |
Streams Sticky TaskAssignor: Stickiness over Balance
Streams' StickyTaskAssignor will honor stickiness over workload balance. More specifically:
- First we calculate the average num.tasks each host should get on average as its "capacity", by dividing the total number of num.tasks to the total number of consumers (i.e. num.threads) and then multiple by the number of consumers that host has.
- Then for each task:
- If it has a client who owns it as its PrevTask, and that client still have capacity assign to it;
- Otherwise if it has a client who owns it as its StandbyTask, and that client still have capacity assign to it;
- If there are still unassigned tasks after step 2), then we loop over them at the per-sub-topology granularity (for workload balance), and again for each task:
- Find the client with the least load, and if there are multiple ones, prefer the one previously owns it, over the one previously owns it as standbyTask, over the one who does not own it at all.
As one can see, we honor stickiness (step 2) over workload balance (step 3).
Proposed Changes
We want to separate the protocol improvement into the consumer and streams layer, since streams today embeds a full-fledged consumer instance that hard-code ConsumerCoordinator.
Part I: Incremental Consumer Rebalance Protocol
We will augment the consumer's rebalance protocol as proposed in Incremental Cooperative Rebalancing: Support and Policies with some tweaks compared to KIP-415. The key idea is that, instead of relying on the single rebalance's synchronization barrier to rebalance the group and hence enforce everyone to give up all the assigned partitions before joining the group as the new generation, we use consecutive rebalances where the end of the first rebalance will actually be used as the synchronization barrier. In the first rebalance, we choose to keep all current assigned tasks running and trade off with one more rebalance. The behavior becomes:
Join group with all current active tasks running.
After first rebalance, sync the revoked partitions and stop them.
Rejoin group immediately with only active tasks to trigger a second rebalance.
More specifically, we will push the following metadata bytes to the consumer layer, which will be universally applicable to any consumers:
Code Block |
---|
SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
VersionId => Int32
LatestSupportVersionId => Int32
ClientUUID => 128bit
PrevTasks => Set<TaskId>
StandbyTasks => Set<TaskId>
EndPoint => HostInfo |
And on assignment:
The assignment metadata bytes (version 4) are:
For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new members, we shall also mark active tasks as "being learned" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance upon readiness. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks.
More specifically, we will push the following metadata bytes to the consumer's protocol:
Terminology
we shall define several terms for easy walkthrough of the algorithm.
- Instance (A.K.A stream instance): the KStream instance serving as container of stream threads set. This could suggest a physical host or a k8s pod. The stream thread's capacity is essentially controlled by the instance relative size.
- Learner task: a special standby task that gets assigned to one stream instance to restore a current active task and transits to active when the restoration is complete.
Learner Task Essential
Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do the task transfer. The goal of learner task is to delay the task migration when the destination host has not finished replaying the active task.For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new members, we shall also mark active tasks as "being learned" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance upon readiness. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks.
Next we are going to look at several typical scaling scenarios and edge scenarios to better understand the design of this algorithm.
...