Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Reduce unnecessary downtime due to task restoration and global application revocation.
  • Better auto scaling experience for KStream applications.
  • Stretch goal: better workload balance across KStream instances.


Background

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its plugable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

Code Block
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.

StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

Code Block
[{member1: {List<Topic> topics1, subscription1}}, {member2: {List<Topic> topics2, subscription2}}, ........ ]

The subscription metadata bytes (version 4) are:

Code Block
SubscriptionInfo = {Int versionId, Int latestSupportVersionId, clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, HostInfo endPoint}

And on assignment: 

Code Block
[{member1: {activePartitions1, assignmentInfo1}}, {member2: {activePartitions2, assignmentInfo2}}, ........ ]

The assignment metadata bytes (version 4) are:

Code Block
AssignmentInfo = {Int versionId, Int latestSupportedVersionId, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks, Map<HostInfo, Set<TopicPartition>> partitionsByHost, Int errorCode}

Proposed Changes

Terminology

we shall define several terms for easy walkthrough of the algorithm.

  • Instance (A.K.A stream instance): the KStream instance serving as container of stream threads set. This could suggest a physical host or a k8s pod. The stream thread's capacity is essentially controlled by the instance relative size.
  • Learner task: a special standby task that gets assigned to one stream instance to restore a current active task and transits to active when the restoration is complete.

Learner Task Essential

Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do the task transfer. The goal of learner task is to delay the task migration when the destination host has not finished replaying the active task.

Stop-The-World Effect

As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would

  1. Join group with all currently assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks and resume working.

  3. Replay the assigned tasks state.

  4. Once all replay jobs finish, stream thread transits to running mode.

The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.

For Kafka Connect, we choose to keep all current assigned tasks running and trade off with one more rebalance. The behavior becomes:

  1. Join group with all current active tasks running.

  2. After first rebalance, sync the revoked partitions and stop them.

  3. Rejoin group immediately with only active tasks to trigger a second rebalance.

Feel free to take a look at KIP-415 example to get a sense of how the algorithm works.

Consumer Rebalance Protocol: Stop-The-World Effect

As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would

  1. Join group with all currently assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks and resume working.

  3. Replay the assigned tasks state.

  4. Once all replay jobs finish, stream thread transits to running mode.

The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.

Streams Rebalance Metadata: Remember the PrevTasks

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its plugable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

Code Block
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.


StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

Code Block
Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes


SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


And on assignment: 

Code Block
Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   AssignmentInfo          => Bytes


AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32


Streams Sticky TaskAssignor: Stickiness over Balance

Streams' StickyTaskAssignor will honor stickiness over workload balance. More specifically:

  1. First we calculate the average num.tasks each host should get on average as its "capacity", by dividing the total number of num.tasks to the total number of consumers (i.e. num.threads) and then multiple by the number of consumers that host has.
  2. Then for each task:
    1. If it has a client who owns it as its PrevTask, and that client still have capacity assign to it;
    2. Otherwise if it has a client who owns it as its StandbyTask, and that client still have capacity assign to it;
  3. If there are still unassigned tasks after step 2), then we loop over them at the per-sub-topology granularity (for workload balance), and again for each task:
    1. Find the client with the least load, and if there are multiple ones, prefer the one previously owns it, over the one previously owns it as standbyTask, over the one who does not own it at all.

As one can see, we honor stickiness (step 2) over workload balance (step 3).



Proposed Changes

We want to separate the protocol improvement into the consumer and streams layer, since streams today embeds a full-fledged consumer instance that hard-code ConsumerCoordinator.

Part I: Incremental Consumer Rebalance Protocol

We will augment the consumer's rebalance protocol as proposed in Incremental Cooperative Rebalancing: Support and Policies with some tweaks compared to KIP-415. The key idea is that, instead of relying on the single rebalance's synchronization barrier to rebalance the group and hence enforce everyone to give up all the assigned partitions before joining the group as the new generation, we use consecutive rebalances where the end of the first rebalance will actually be used as the synchronization barrier. In the first rebalance, we choose to keep all current assigned tasks running and trade off with one more rebalance. The behavior becomes:

  1. Join group with all current active tasks running.

  2. After first rebalance, sync the revoked partitions and stop them.

  3. Rejoin group immediately with only active tasks to trigger a second rebalance.


More specifically, we will push the following metadata bytes to the consumer layer, which will be universally applicable to any consumers:



Code Block
SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


And on assignment: 

The assignment metadata bytes (version 4) are:



For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new members, we shall also mark active tasks as "being learned" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance upon readiness. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks.


More specifically, we will push the following metadata bytes to the consumer's protocol:



Terminology

we shall define several terms for easy walkthrough of the algorithm.

  • Instance (A.K.A stream instance): the KStream instance serving as container of stream threads set. This could suggest a physical host or a k8s pod. The stream thread's capacity is essentially controlled by the instance relative size.
  • Learner task: a special standby task that gets assigned to one stream instance to restore a current active task and transits to active when the restoration is complete.

Learner Task Essential

Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do the task transfer. The goal of learner task is to delay the task migration when the destination host has not finished replaying the active task.For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new members, we shall also mark active tasks as "being learned" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance upon readiness. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks. 

Next we are going to look at several typical scaling scenarios and edge scenarios to better understand the design of this algorithm.

...