...
- instance balance: The instance with the most tasks has no more than "balance_factor" more tasks than the instance with the least tasks.
- subtopology balance: The subtopology with the most instances executing its tasks has no more than "balance_factor" more instances executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.
State with logging disabled
There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.
Rather than building this special case into the code, we can simply have the members report their lag on non-logged stores as zero. Then, the task would automatically be considered stateful for assignment, but all instances would be considered equal assignment candidates. Further, because zero is the additive identity, this strategy works seamlessly when a task has both logged and non-logged stores. I.e., the "synthetic" zero-lag for the non-logged store gets added to the non-zero lag for the logged store, and we sensibly report the total lag for the task to just be the lag on the logged store.
Rebalance Metadata
The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor
interface, along with a black-box SubscriptionInfo
field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor
assignment response contains another black-box field to encode extra information from the leader to the members.
Today, Streams uses the StreamsPartitionAssignor
to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.
More specifically on subscription:
Rebalance Metadata
The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor
interface, along with a black-box SubscriptionInfo
field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor
assignment response contains another black-box field to encode extra information from the leader to the members.
Today, Streams uses the StreamsPartitionAssignor
to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.
More specifically on subscription:
Code Block |
---|
KafkaConsumer:
Subscription => TopicList SubscriptionInfo
TopicList => List<String>
SubscriptionInfo => Bytes
------------------
StreamsPartitionAssignor:
SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
VersionId |
Code Block |
KafkaConsumer: Subscription => TopicList SubscriptionInfo TopicList => List<String>Int32 LatestSupportVersionId SubscriptionInfo => Int32 ClientUUID => Bytes ------------------ StreamsPartitionAssignor: SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks=> EndPoint128bit VersionIdPrevTasks => Int32Set<TaskId> LatestSupportVersionId => Int32 ClientUUID => 128bit PrevTasks => Set<TaskId> StandbyTasks => Set<TaskId> EndPoint => HostInfo |
...
Code Block |
---|
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID TaskLags EndPoint VersionId => Int32 LatestSupportVersionId => Int32 ClientUUID => 128bit Task Lags => Map<TaskId, Int32> // new change EndPoint => HostInfo |
The new field, TaskLags
would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks
=> HostInfo |
The new field, TaskLags
would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks
We do not need any modifications to AssignmentInfo
at this time.
State with logging disabled
There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.
Rather than building this special case into the code, we can simply have the members report their lag on non-logged stores as zero. Then, the task would automatically be considered stateful for assignment, but all instances would be considered equal assignment candidates. Further, because zero is the additive identity, this strategy works seamlessly when a task has both logged and non-logged stores. I.e., the "synthetic" zero-lag for the non-logged store gets added to the non-zero lag for the logged store, and we sensibly report the total lag for the task to just be the lag on the logged storeWe do not need any modifications to AssignmentInfo
at this time.
Example Scenarios
Scaling Out
...