Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

One drawback of this solution is that the final solution is based on the number of tasks which can be assigned to each client computed from HAAssignor’s step 1 - 2. If every client has an equal number of threads but the number of tasks couldn’t be divided equally, then there’s one client with a different number of tasks. Ideally, this client could be anyone when we find our min cost assignment, but we fix it during our computation, which may result in a slight sub-optimal solution.

Rack awareness assignment for standby tasks

Assigning standby tasks comes with even more constraints:

  1. Standby task shouldn’t be assigned to client which already have the active task
  2. Ideally, standby tasks should be in different rack from corresponding active tasks
  3. Ideally, standby tasks themselves should be in different racks
  4. Ideally, cross rack traffic cost should be minimized
  5. Assignment should be deterministic

There are several options to balance between reliability and efficiency for standby task assignment.

Balance reliability over cost

In this case, we should first optimize constraints 1 - 3. After we get an assignment from ClientTagAwareStandbyTaskAssignor for example, we use two loops to check if we could swap any standby task assignment between different clients which can result in smaller traffic cost. This is a greedy algorithm which may not give the optimal solution.

Since 3.C.I and 3.C.II have different tradeoffs and work better in different workloads etc, we could add an internal configuration to choose one of them at runtime.

How to make the assignment as stable as possible if there are small changes in the CRT cost

While the min-cost flow algorithm can find us assignments with min CRT cost, it’s also possible that if a TopicPartition’s rack changes, the newly computed assignment could drastically change which can cause task shuffling and new warmups. So it’s preferable to make the new assignment as close to the old assignment as possible. 

Option 1

It’s possible to assign costs to edges considering if the edge is in old assignment (based on John’s suggestion). This will help to make the new assignment overlap with old assignment especially when multiple min-cost solutions for CRT exist. The issue with this approach is that it’s impossible to make sure that the following probing rebalance can converge without remembering the computed target assignment and relevant clients/TopicPartition information. This stateful information makes the assignment logic much more complicated.

Option 2

While option 1 poses some challenges to store assignment information across rebalances and make sure the assignment can converge, we can choose to overlap more with target assignment computed by HAAssignor step 1 - 2. The idea is that if we always try to make it overlap as much with HAAssignor’s target assignment, at least there’s a higher chance that tasks won’t be shuffled a lot if the clients remain the same across rebalances. In this way, we can maintain a stateless assignment but at the same time try to overlap with some fixed assignment as much as possible. Below is the adjusted cost function suggested by John Roesler 

Code Block
int getCost(Task t, Client c) {
    final int TRAFFIC_WEIGHT = 2;
    final int OVERLAP_WEIGHT = 1;
    int cost = 0;
    
    // add weights for cross-AZ traffic
    for TopicPartion tp : t.topicPartitions() {
        if (tp has no replica in same rack as c) {
            cost += TRAFFIC_WEIGHT;
        }
    }
    
    // add weights for overlap with current assignment
    if (c is NOT currently target assignment) {
        cost += OVERLAP_WEIGHT;
    }
} 

TRAFFIC_WEIGHT and OVERLAP_WEIGHT  indicate how much we favor minimizing traffic cost compared to maximizing overlap. In the extreme case, we can put OVERLAP_WEIGHT to 0 which means we always compute the lowest CRT assignment. We can expose these weights as internal configs.

Summary

In summary, for active task assignment, we can add internal configs to choose algorithm I or algorithm II. We can make algorithm I as default to minimize total CRT cost. For cost computation, we can use Option 2 in III and expose configs to adjust the weight.

Rack awareness assignment for standby tasks

Assigning standby tasks comes with even more constraints:

  1. Standby task shouldn’t be assigned to client which already have the active task
  2. Ideally, standby tasks should be in different rack from corresponding active tasks
  3. Ideally, standby tasks themselves should be in different racks
  4. Ideally, cross rack traffic cost should be minimized
  5. Assignment should be deterministic

There are several options to balance between reliability and efficiency for standby task assignment.

Balance reliability over cost

In this case, we should first optimize constraints 1 - 3. After we get an assignment from ClientTagAwareStandbyTaskAssignor for example, we use two loops to check if we could swap any standby task assignment between different clients which can result in smaller traffic cost. This is a greedy algorithm which may not give the optimal solution.

Code Block
for (ClientState client : ClientStates) {
    for (Task standby task : client.standbyTasks) {
        for (ClientState otherClient : ClientState) {
            if (client.equals(otherClient) || otherClient.contains(task)) {
                continue;
            }
            for (Task otherStandbyTask : otherClient.standbyTask) {
                if (swap task and otherStandbyTask is feasible and have smaller cost) {
                    swap(standbyTask, otherStandbyTask);
                }
            }
      }
}

Prefer reliability and then find optimal cost

We could also first find a more reliable standby assignment and then if we have a certain number of reliable standby assignments. We switch to prefer to minimize cost. For example, if we would like at least 1 standby to be in a different rack from the active, when we assign the first standby, we can make clients which are in a different rack from the active client have less cost. We can adjust the cost function as below:

Code Block
int getCost(Task t, Client c
Code Block
for (ClientState client : ClientStates) {
    for (Task standby task : client.standbyTasks) {
        for (ClientState otherClient : ClientState) {
            if (client.equals(otherClient) || otherClient.contains(task)) {
                continue;
            }
            for (Task otherStandbyTask : otherClient.standbyTask) {
                if (swap task and otherStandbyTask is feasible and have smaller cost) {
    final int TRAFFIC_WEIGHT = 1;
    final int SAME_RACK_WEIGHT      swap(standbyTask, otherStandbyTask)= 10;
    int cost       = 0;
    }
    // add weights for cross-AZ traffic
   }
 for TopicPartion tp :  }
}

Prefer reliability and then find optimal cost

We could also first find a more reliable standby assignment and then if we have a certain number of reliable standby assignments. We switch to prefer to minimize cost. For example, if we would like at least 1 standby to be in a different rack from the active, when we assign the first standby, we can make clients which are in a different rack from the active client have less cost. So we can adjust the cost in following way:

Code Block
task/client with different rack from active will always have lower cost than task/client with same rack from active
if they have same rack, whichever have lower CRT will have lower cost

We can run the min-cost flow algorithm to assign a number of standby until we feel we can prefer low CRT now. Then we can adjust the cost in following way:

Code Block
task/client with lower CRT will have lower cost
if they have same CRT cost, whichever in different rack from active or other standby tasks will have lower costt.topicPartitions() {
        if (tp has no replica in same rack as c) {
            cost += TRAFFIC_WEIGHT;
        }
    }
    
    // add weights for assignment with different racks
    for (Client c1 which has t assigned) {
        if (c is in same rack as other clients which has t) {
            cost += SAME_RACK_WEIGHT;
        }
    }
} 

Option II is more configurable and may result in lower CRT cost. But it’s more complicatedin lower CRT cost. But it’s more complicated.

Summary

In summary, we can also add configs to choose between algorithm I and algorithm II. We can make algorithm II as default.

What if both rack information and rack aware tags present

KIP-708: Rack awareness for Kafka Streams add rack awareness for Kafka Streams. If both rack aware tags and rack information in subscription are present, we could default to one. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys.

...

There are no changes in public interfaces.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

...