Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • instance balance: The instance with the most tasks has no more than "balance_factor" more tasks than the instance with the least tasks.
  • subtopology balance: The subtopology with the most instances executing its tasks has no more than "balance_factor" more instances executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.

State with logging disabled

There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.

Rather than building this special case into the code, we can simply have the members report their lag on non-logged stores as zero. Then, the task would automatically be considered stateful for assignment, but all instances would be considered equal assignment candidates. Further, because zero is the additive identity, this strategy works seamlessly when a task has both logged and non-logged stores. I.e., the "synthetic" zero-lag for the non-logged store gets added to the non-zero lag for the logged store, and we sensibly report the total lag for the task to just be the lag on the logged store.

Rebalance Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

Today, Streams uses the  StreamsPartitionAssignor to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.

More specifically on subscription:

Rebalance Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

Today, Streams uses the  StreamsPartitionAssignor to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.

More specifically on subscription:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId
Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>Int32
   LatestSupportVersionId SubscriptionInfo => Int32
   ClientUUID   => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks=> EndPoint128bit

   VersionIdPrevTasks               => Int32Set<TaskId>
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo

...

Code Block
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID TaskLags EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   Task Lags               => Map<TaskId, Int32>    // new change
   EndPoint                   => HostInfo

The new field, TaskLags would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks

     => HostInfo

The new field, TaskLags would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks

We do not need any modifications to AssignmentInfo at this time.

State with logging disabled

There is a special case to consider: stateful stores with logging disabled. We have an a priori expectation that stateful tasks are more heavyweight than stateless tasks, so from the perspective of cluster balance, we should consider tasks containing non-logged stores to be stateful (and therefore heavy). On the other hand, if a store is not logged, then there is no notion of being "caught up" or of having a "lag" on it. So, we should consider all instances to be equally caught-up on such stores.

Rather than building this special case into the code, we can simply have the members report their lag on non-logged stores as zero. Then, the task would automatically be considered stateful for assignment, but all instances would be considered equal assignment candidates. Further, because zero is the additive identity, this strategy works seamlessly when a task has both logged and non-logged stores. I.e., the "synthetic" zero-lag for the non-logged store gets added to the non-zero lag for the logged store, and we sensibly report the total lag for the task to just be the lag on the logged storeWe do not need any modifications to AssignmentInfo at this time.

Example Scenarios

Scaling Out

...