Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Standby task shouldn’t be assigned to client which already have the active task
  2. Ideally, standby tasks should be in different rack from corresponding active tasks
  3. Ideally, standby tasks themselves should be in different racks
  4. Ideally, cross rack traffic cost should be minimized
  5. Assignment should be deterministic

There are several options to balance between reliability and efficiency for standby task assignment.While it's possible to come up very complex min-cost flow algorithms for standby assignment, I think it's OK to use an easy greedy algorithm for standby assignment initially:

I. Balance reliability over cost

...

Code Block
for (ClientState client : ClientStates) {
    for (Task standby task : client.standbyTasks) {
        for (ClientState otherClient : ClientState) {
            if (client.equals(otherClient) || otherClient.contains(task)) {
                continue;
            }
            for (Task otherStandbyTask : otherClient.standbyTask) {
                if (swap task and otherStandbyTask is feasible and have smaller cost) {
                    swap(standbyTask, otherStandbyTask);
                }
            }
      }
}

II. Prefer reliability and then find optimal cost

We could also first find a more reliable standby assignment and then if we have a certain number of reliable standby assignments. We switch to prefer to minimize cost. For example, if we would like at least 1 standby to be in a different rack from the active, when we assign the first standby, we can make clients which are in a different rack from the active client have less cost. We can adjust the cost function as below:

Code Block
int getCost(Task t, Client c) {
    final int TRAFFIC_WEIGHT = 1;
    final int SAME_RACK_WEIGHT = 10;
    int cost = 0;
    
    // add weights for cross-AZ traffic
    for TopicPartion tp : t.topicPartitions() {
        if (tp has no replica in same rack as c) {
            cost += TRAFFIC_WEIGHT;
        }
    }
    
    // add weights for assignment with different racks
    for (Client c1 which has t assigned) {
        if (c is in same rack as other clients which has t) {
            cost += SAME_RACK_WEIGHT;
        }
    }
} 

After we assign the first standby to different racks as much as possible, we adjust the cost function to give more weight for TRAFFIC_WEIGHT  so that we prefer low traffic assignments.

Option II is more configurable and may result in lower CRT cost. But it’s more complicated.

III. Summary

In summary, we can also add a configuration such as internal.rack.aware.assignment.standby.strategy  with value greedy  and balance  to choose between algorithm I and algorithm II. We can make algorithm II as default.

D. What if both rack information and rack aware tags are present

KIP-708: Rack aware StandbyTask assignment for Kafka Streams added rack awareness configurations for Kafka Streams with rack.aware.assignment.tags  and client.tag .  The goal is to use the tags to identify locations of clients and try to put tasks in clients of different locations as much as possible. This configuration could possibly be a superset of clients’ rack configuration in consumer which is client.rack . There are several options on how to assign standby tasks when both configurations are present.

I. Choose a default one

If both rack aware tags and client rack information in subscription are present, we could default to use one for standby assignment. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys. So we default to rack aware tags.

II. Enforce client.rack information must be present in rack.aware.assignment.tags

For example, if a client configures consumer rack information as client.rack: az1 . Then rack aware tags must contain rack.aware.assignment.tags: rack  and client.tag.rack: az1 . We can fail the assignment if above is not satisfied.

III. Check client.rack information must be present in rack.aware.assignment.tags but don’t enforce it

Option II is too strict. We could just check and warn the users in logs if both client.rack  and rack.aware.assignment.tags   are configured but rack doesn’t exist in rack.aware.assignment.tags  or client.tag.rack  doesn’t have the same value as client.rack . However, we continue processing the assignment using option I by defaulting to one of the configurations.

IV. Summary

I think option III is good enough for us and we can default to rack.aware.assignment.tags  configuration since it can contain more information than client.rack .

E. Assignment for stateless tasks

We can use a min-cost flow algorithm to assign stateless tasks. First, we can find a feasible solution based on task load. Then we can use the cycle canceling algorithm to compute minimum cost assignment as we do for active tasks. We can reuse the configuration for active tasks to control if we merely compute the min-cost assignment or we also balance tasks for the same sub-topology.

4. Public Interfaces

There will be a few new public configs added:

  • rack.aware.assignment.strategy: this config controls if we should use 4.B.I or 4.B.II to compute min-cost with or without balanced sub-topology
  • rack.aware.assignment.enabled: this config controls if we should enable rack aware assignment
  • rack.aware.assignment.traffic_cost: this config controls the cost we add to cross rack traffic
  • rack.aware.assignment.non_overlap_cost: this config controls the cost we add to non overlap assignment with targeted assignment computed by HAAssignor

Note client.rack consumer config needs to be configured by customer to enable rack aware assignment.

5. Compatibility, Deprecation, and Migration Plan

We will implement the new rack aware assignment logic in HighAvailabilityTaskAssignor  and use the new assignor only when client rack information appears in Subscriptions and at least one of the TopicPartitions in some tasks have different cost compared to other TopicPartitions. This is because if all of the have the same cost, there's no point to use rack aware assignment logic to compute min cost since assigning tasks to any clients doesn't make a difference. If users want to use rack aware assignment, they need to upgrade Kafka Streams to at least the earliest supported version. It's better to stop all the instances and start them all with the latest version with client.rack  config, but it's also OK to do a rolling bounce with the new version. Rolling bounce with new version may cause the assignment changing between rack aware assignment and no rack aware assignment but eventually the assignment will be computed using rack awareness after every instance uses the new version.

6. Test Plan

  • Unit test will be added for min-cost flow graph algorithm and rack aware assignment
  • Existing integration test will be updated to ensure it works with or without rack configuration as well as with both client.rack  and rack.aware.assignment.tags  configurations
  • New integration test will be added to verify rack aware assignment
  • Existing system test will be used to verify compatibility with old version
  • TaskAssignorConvergenceTest will be updated to verify the new assignor will converge and it produces no worse assignment in terms of cross AZ cost
  • Performance test will be done manually to verify the efficiency of min-cost flow computation with large number of tasks and clients as well as different number of subscribed topic partitions

7. Rejected Alternatives


III. Summary

In summary, we can also add a configuration such as internal.rack.aware.assignment.standby.strategy  with value greedy  and balance  to choose between algorithm I and algorithm II. We can make algorithm II as default.

D. What if both rack information and rack aware tags are present

KIP-708: Rack aware StandbyTask assignment for Kafka Streams added rack awareness configurations for Kafka Streams with rack.aware.assignment.tags  and client.tag .  The goal is to use the tags to identify locations of clients and try to put tasks in clients of different locations as much as possible. This configuration could possibly be a superset of clients’ rack configuration in consumer which is client.rack . There are several options on how to assign standby tasks when both configurations are present.

I. Choose a default one

If both rack aware tags and client rack information in subscription are present, we could default to use one for standby assignment. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys. So we default to rack aware tags.

II. Enforce client.rack information must be present in rack.aware.assignment.tags

For example, if a client configures consumer rack information as client.rack: az1 . Then rack aware tags must contain rack.aware.assignment.tags: rack  and client.tag.rack: az1 . We can fail the assignment if above is not satisfied.

III. Check client.rack information must be present in rack.aware.assignment.tags but don’t enforce it

Option II is too strict. We could just check and warn the users in logs if both client.rack  and rack.aware.assignment.tags   are configured but rack doesn’t exist in rack.aware.assignment.tags  or client.tag.rack  doesn’t have the same value as client.rack . However, we continue processing the assignment using option I by defaulting to one of the configurations.

IV. Summary

I think option III is good enough for us and we can default to rack.aware.assignment.tags  configuration since it can contain more information than client.rack .

E. Assignment for stateless tasks

We can use a min-cost flow algorithm to assign stateless tasks. First, we can find a feasible solution based on task load. Then we can use the cycle canceling algorithm to compute minimum cost assignment as we do for active tasks. We can reuse the configuration for active tasks to control if we merely compute the min-cost assignment or we also balance tasks for the same sub-topology.

4. Public Interfaces

There will be a few new public configs added:

  • rack.aware.assignment.strategy: this config controls if we should use 4.B.I or 4.B.II to compute min-cost with or without balanced sub-topology
  • rack.aware.assignment.enabled: this config controls if we should enable rack aware assignment
  • rack.aware.assignment.traffic_cost: this config controls the cost we add to cross rack traffic
  • rack.aware.assignment.non_overlap_cost: this config controls the cost we add to non overlap assignment with targeted assignment computed by HAAssignor

Note client.rack consumer config needs to be configured by customer to enable rack aware assignment.

5. Compatibility, Deprecation, and Migration Plan

We will implement the new rack aware assignment logic in HighAvailabilityTaskAssignor  and use the new assignor only when client rack information appears in Subscriptions and at least one of the TopicPartitions in some tasks have different cost compared to other TopicPartitions. This is because if all of the have the same cost, there's no point to use rack aware assignment logic to compute min cost since assigning tasks to any clients doesn't make a difference. If users want to use rack aware assignment, they need to upgrade Kafka Streams to at least the earliest supported version. It's better to stop all the instances and start them all with the latest version with client.rack  config, but it's also OK to do a rolling bounce with the new version. Rolling bounce with new version may cause the assignment changing between rack aware assignment and no rack aware assignment but eventually the assignment will be computed using rack awareness after every instance uses the new version.

6. Test Plan

  • Unit test will be added for min-cost flow graph algorithm and rack aware assignment
  • Existing integration test will be updated to ensure it works with or without rack configuration as well as with both client.rack  and rack.aware.assignment.tags  configurations
  • New integration test will be added to verify rack aware assignment
  • Existing system test will be used to verify compatibility with old version
  • TaskAssignorConvergenceTest will be updated to verify the new assignor will converge and it produces no worse assignment in terms of cross AZ cost
  • Performance test will be done manually to verify the efficiency of min-cost flow computation with large number of tasks and clients as well as different number of subscribed topic partitions

7. Rejected Alternatives

A. The follow algorithm for standby assignment is rejected

Prefer reliability and then find optimal cost

We could also first find a more reliable standby assignment and then if we have a certain number of reliable standby assignments. We switch to prefer to minimize cost. For example, if we would like at least 1 standby to be in a different rack from the active, when we assign the first standby, we can make clients which are in a different rack from the active client have less cost. We can adjust the cost function as below:

Code Block
int getCost(Task t, Client c) {
    final int TRAFFIC_WEIGHT = 1;
    final int SAME_RACK_WEIGHT = 10;
    int cost = 0;
    
    // add weights for cross-AZ traffic
    for TopicPartion tp : t.topicPartitions() {
        if (tp has no replica in same rack as c) {
            cost += TRAFFIC_WEIGHT;
        }
    }
    
    // add weights for assignment with different racks
    for (Client c1 which has t assigned) {
        if (c is in same rack as other clients which has t) {
            cost += SAME_RACK_WEIGHT;
        }
    }
} 

After we assign the first standby to different racks as much as possible, we adjust the cost function to give more weight for TRAFFIC_WEIGHT  so that we prefer low traffic assignments. This option is rejected because it's too complex. We need to run min-cost flow several times and it's not clear we could get a valid assignment solution each timeNo rejected alternatives yet.