Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There are two assignors implemented in Kafka Streams now: StickyTaskAssignor and HighAvailabilityTaskAssignor. HighAvailabilityTaskAssignor was created for KIP-441: Smooth Scaling Out for Kafka Streams and and it’s the currently default assignor configured in Kafka Streams. As a result, this design will only augment HighAvailabilityTaskAssignor to account for the rack aware assignment. StickyTaskAssignor  users who would like to use rack aware assignment should upgrade their Kafka Streams version to the version in which HighAvailabilityTaskAssignor and rack awareness assignment are availableStickyTaskAssignor  can also be used by existing customer. We could put rack aware assignment logic in a separate module which be used by both StickyTaskAssignor  and HighAvailabilityTaskAssignor . Below we mainly discuss how it works with HighAvailabilityTaskAssignor. It works similar for StickyTaskAssignor.

How HighAvailabilityTaskAssignor (HAAssignor) works

...

While the min-cost flow algorithm can find us assignments with min CRT cost, it’s also possible that if a TopicPartition ’s rack changes, the newly computed assignment could drastically change which can cause task shuffling and new warmups which involves more network and computation cost. So it’s preferable to make the new assignment as close to the old assignment as possible. 

Option 1

It’s possible to assign costs to edges considering if the edge is in old assignment (based on John’s suggestion). This will help to make the new assignment overlap with old assignment especially when multiple min-cost solutions for CRT exist. The issue with this approach is that it’s impossible to make sure that the following probing rebalance can converge without remembering the computed target assignment and relevant clients/TopicPartition information. This stateful information makes the assignment logic much more complicated.

Option 2

cost. So it’s preferable to make the new assignment as close to the old assignment as possible. 

While it's tempting to factor in previous assignment to make the new task assignment sticky, it While option 1 poses some challenges to store assignment information across rebalances and make sure the assignment can converge. Alternatively, we can choose to overlap more with target assignment computed by HAAssignor step 1 - 2. The idea is that if we always try to make it overlap as much with HAAssignor’s target assignment, at least there’s a higher chance that tasks won’t be shuffled a lot if the clients remain the same across rebalances even if some traffic cost changes. In this way, we can maintain a stateless assignment but at the same time try to overlap with some fixed assignment as much as possible. Below is the adjusted cost function suggested by John Roesler 

...

TRAFFIC_WEIGHT and OVERLAP_WEIGHT  indicate how much we favor minimizing traffic cost compared to maximizing overlap. In the extreme case, we can put OVERLAP_WEIGHT to 0 which means we always compute the lowest CRT assignment. We can expose these weights as internal configs. Note that we don't want to put a very high value for the weight because the cost upper bound appears in the algorithm's time complexity O(E^2 * (CUMU)) .

Summary

In summary, for active task assignment, we can add public configs such as rack.aware.assignment.strategy  strategy  with value min_cost  cost  or balanced_min_cost  cost  to choose algorithm I or algorithm II. We can make algorithm I as default to minimize total CRT cost. For cost computation, we can use Option 2 in III and expose configs to adjust the weight such as rack.aware.assignment.traffic_cost  and rack.aware.assignment.non_overlap_cost . For StickyTaskAssignor , we could run the same algorithm follow the active task assignment for stateful active tasks by adjusting the traffic_weight  and rack.aware.assignment.non_overlap_weight ._cost   and non_overlap_cost   to balance stickiness and traffic cost. Note that for StickyTaskAssignor , we would increase NON_OVERLAP_COST  if t is not assigned to c in previous assignment.

C. Rack awareness assignment for standby tasks

...

Code Block
for (ClientState client : ClientStates) {
    for (Task standby task : client.standbyTasks) {
        for (ClientState otherClient : ClientState) {
            if (client.equals(otherClient) || otherClient.contains(task)) {
                continue;
            }
            for (Task otherStandbyTask : otherClient.standbyTask) {
                if (swap task and otherStandbyTask is feasible and have smaller cost) {
                    swap(standbyTask, otherStandbyTask);
                }
            }
      }
}

III. Summary

...

      }
      }
}

D. What if both rack information and rack aware tags are present

...

We will implement the new rack aware assignment logic in HighAvailabilityTaskAssignor  and use the new assignor StickTaskAssignor. Rack aware assignment will be used only when

  • client rack information appears in Subscriptions

...

  • by configuring client.rack  on client side
  • at least one of the TopicPartitions in some tasks have different cost compared to other TopicPartitions. This is because if all of

...

  • them have the same cost, there's no point to use rack aware assignment logic to compute min cost since assigning tasks to any clients doesn't make a difference.
  • rack.aware.assignment.enabled config is enabled

If users want to use rack aware assignment, they need to upgrade Kafka Streams to at least the earliest supported version. It's better to stop all the instances and start them all with the latest version with client.rack  config, but it's also OK to do a rolling bounce with the new version. Rolling bounce with new version may cause the assignment changing between rack aware assignment and no rack aware assignment but eventually the assignment will be computed using rack awareness after every instance uses the new version.

...