Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

...

1. Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

2. Motivation

Kafka TopicPartitions  are replicated on the broker side to different racks. KIP-881: Rack-aware Partition Assignment for Kafka Consumers added rack information of clients in partition assignment interface so that the assignment logic can utilize this information to try to assign TopicPartitions  to the clients in the same rack as much as possible so that it would avoid cross rack traffic (CRT) which has higher costs in terms of latency and money.

The client rack awareness additions in partition assignments also affects how Kafka Streams can assign its tasks to clients so that cross rack traffic can be avoided as much as possible. This design discusses how we can augment current assignment logic in Kafka Streams to utilize rack information to minimize cross rack traffic with other constraints.

3. Current Task Assignment Logic

There are two assignors implemented in Kafka Streams now: StickyTaskAssignor and HighAvailabilityTaskAssignor. HighAvailabilityTaskAssignor was created for KIP-441: Smooth Scaling Out for Kafka Streams and it’s the currently default assignor configured in Kafka Streams. As a result, this design will only augment HighAvailabilityTaskAssignor to account for the rack aware assignment. StickyTaskAssignor  users who would like to use rack aware assignment should upgrade their Kafka Streams version to the version in which HighAvailabilityTaskAssignor and rack awareness assignment are available.

...

The main advantage of HAAsssignor  is that it takes caught-up information on clients into account and will try to assign tasks to more caught-up clients so that those tasks can be run first. At the same time, the original assigned client will try to warm up its state and try to take over the task in following probing rebalances.

4. Design for rack aware assignment

There are several constraints for rack aware assignment:

...

An ideal solution would be that for active, standby and stateless tasks, we can find an assignment for them which minimizes the total cost while satisfying all the constraints. However, I couldn’t find an efficient polynomial-time global optimization algorithm to do this. As a result, I propose the local optimizations that we still follow the structure of HAAssignor: first, we compute min cost assignment for active tasks. Then we compute it for standby tasks and lastly we compute assignments for stateless tasks. 

A. Min-cost flow problem

The min-cost flow problem is defined as follows: given a directed graph, each edge has a capacity constraint which limits how much traffic it allows. Each edge also has a cost for each traffic unit. Each node could have a supply or demand of certain traffic. The problem is to find a max flow of traffic in which the total cost along the edges can be minimized.

...

Tasks and Clients are graph nodes. Every task is connected to every client by an edge with capacity 1. The cost of an edge is the cost of cross rack traffic between the task and the client. If a task is assigned to a client, then there’s one flow between them and vice versa. Then our assignment problem is translated to find the min-cost max flow in our graph with the constraint that the flow should be balanced between clients.

I. Algorithm

Klein’s cycle canceling algorithm can solve the min-cost flow problem in O(E^2CU) time where C is max cost and U is max capacity. In our particular case, U is 1 and C is at most 3 (A task can have at most 3 topics including changelog topic?). So the algorithm runs in O(E^2) time for our case. Note E is T * N  where T is the number of tasks and N is the number of clients. This is because each task can be assigned to any client; so there is an edge because every task to every client. In another word, the time complexity of the algorithm is O(T^2*N^2) . Below is a rough sketch of the algorithm:

...

Negative cycles and residual graphs can be read here. Negative cycles can be found using Bellman-ford’s algorithm. How to find a feasible flow? It can be found by using step 1 - 2 of HAAssignor . What’s even nice about the cycle canceling algorithm is that since our graph is a bipartite graph, every time we find a negative cycle, the client nodes in the cycle satisfies that if there’s an in edge, there must be an out edge which means the flow of the clients won’t be changed. As a result, each round of the cycle canceling will reduce the cost and won’t change the total assignments of a client. So eventually, when there’s no more negative cycles, we find a min-cost balanced assignment. Below is an example of finding an min cost of 2 to assign task t1, t2  to clients c1, c2 .


B. Rack awareness assignment algorithm for active tasks

I. Min cost with unbalanced sub-topology

Code Block
1. Compute cost cross rack cost of each task/client pair
2. Find an assignment following step 1 - 2 of HAAssignor
Convert assignment to residual graph: if there's assignment, change capacity to 0 and flow to 1, if there's no assignment, change flow to 0 and capacity to 1. Edge cost is task/client cost
3. while there's negative cycle in current graph:
       find negative cycle, update flow
4. Get assignment from final flow. If there's an flow in the edge, the two nodes of the edge which are task and client have assignment relationship.

...

One caveat is that while the load on each client is similar, this algorithm doesn’t guarantee that different partitions of different sub-topology will be assigned to different clients as much as possible.

II. Min cost with balanced sub-topology

Above algorithm doesn't balance tasks for same sub-topology which may lead to processing skewness. We could try to balance it by constructing a more complex graph by limiting how many tasks of a sub-topology we can assign to a client.

...

Since 3.C.I and 3.C.II have different tradeoffs and work better in different workloads etc, we could add an internal configuration to choose one of them at runtime.

III. How to make the assignment as stable as possible if there are small changes in the CRT cost

While the min-cost flow algorithm can find us assignments with min CRT cost, it’s also possible that if a TopicPartition ’s rack changes, the newly computed assignment could drastically change which can cause task shuffling and new warmups which involves more network and computation cost. So it’s preferable to make the new assignment as close to the old assignment as possible. 

...

Code Block
int getCost(Task t, Client c) {
    final int TRAFFIC_WEIGHTCOST = 10;
    final int NON_OVERLAP_WEIGHTCOST = 1;
    int cost = 0;
    
    // add weights for cross-AZ traffic
    for TopicPartion tp : t.topicPartitions() {
        if (tp has no replica in same rack as c) {
            cost += TRAFFIC_WEIGHTCOST;
        }
    }
    
    // add weights for overlap with current assignment
    if (c is NOT currently target assignment) {
        cost += NON_OVERLAP_WEIGHTCOST;
    }
} 

TRAFFIC_WEIGHT and OVERLAP_WEIGHT  indicate how much we favor minimizing traffic cost compared to maximizing overlap. In the extreme case, we can put OVERLAP_WEIGHT to 0 which means we always compute the lowest CRT assignment. We can expose these weights as internal configs. Note that we don't want to put a very high value for the weight because the cost upper bound appears in the algorithm's time complexity O(E^2 * (CU)) .

...

In summary, for active task assignment, we can add internal public configs such as internal.rack.aware.assignment.strategy  with value min_cost  or balanced_min_cost  to choose algorithm I or algorithm II. We can make algorithm I as default to minimize total CRT cost. For cost computation, we can use Option 2 in III and expose configs to adjust the weight such as internal.rack.aware.assignment.traffic_weight  and internal.rack.aware.assignment.non_overlap_weight .

C. Rack awareness assignment for standby tasks

Assigning standby tasks comes with even more constraints or preference:

...

There are several options to balance between reliability and efficiency for standby task assignment.

I. Balance reliability over cost

In this case, we should first optimize constraints 1 - 3. After we get an assignment from ClientTagAwareStandbyTaskAssignor for example, we use two loops to check if we could swap any standby task assignment between different clients which can result in smaller traffic cost. This is a greedy algorithm which may not give the optimal solution.

Code Block
for (ClientState client : ClientStates) {
    for (Task standby task : client.standbyTasks) {
        for (ClientState otherClient : ClientState) {
            if (client.equals(otherClient) || otherClient.contains(task)) {
                continue;
            }
            for (Task otherStandbyTask : otherClient.standbyTask) {
                if (swap task and otherStandbyTask is feasible and have smaller cost) {
                    swap(standbyTask, otherStandbyTask);
                }
            }
      }
}

II. Prefer reliability and then find optimal cost

We could also first find a more reliable standby assignment and then if we have a certain number of reliable standby assignments. We switch to prefer to minimize cost. For example, if we would like at least 1 standby to be in a different rack from the active, when we assign the first standby, we can make clients which are in a different rack from the active client have less cost. We can adjust the cost function as below:

...

Option II is more configurable and may result in lower CRT cost. But it’s more complicated.

III. Summary

In summary, we can also add a configuration such as internal.rack.aware.assignment.standby.strategy  with value greedy  and balance  to choose between algorithm I and algorithm II. We can make algorithm II as default.

D. What if both rack information and rack aware tags are present

KIP-708: Rack aware StandbyTask assignment for Kafka Streams added rack awareness configurations for Kafka Streams with rack.aware.assignment.tags  and client.tag .  The goal is to use the tags to identify locations of clients and try to put tasks in clients of different locations as much as possible. This configuration could possibly be a superset of clients’ rack configuration in consumer which is client.rack . There are several options on how to assign standby tasks when both configurations are present.

I. Choose a default one

If both rack aware tags and client rack information in subscription are present, we could default to use one for standby assignment. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys. So we default to rack aware tags.

II. Enforce client.rack information must be present in rack.aware.assignment.tags

For example, if a client configures consumer rack information as client.rack: az1 . Then rack aware tags must contain rack.aware.assignment.tags: rack  and client.tag.rack: az1 . We can fail the assignment if above is not satisfied.

III. Check client.rack information must be present in rack.aware.assignment.tags but don’t enforce it

Option II is too strict. We could just check and warn the users in logs if both client.rack  and rack.aware.assignment.tags   are configured but rack doesn’t exist in rack.aware.assignment.tags  or client.tag.rack  doesn’t have the same value as client.rack . However, we continue processing the assignment using option I by defaulting to one of the configurations.

IV. Summary

I think option III is good enough for us and we can default to rack.aware.assignment.tags  configuration since it can contain more information than client.rack .

E. Assignment for stateless tasks

We can use a min-cost flow algorithm to assign stateless tasks. First, we can find a feasible solution based on task load. Then we can use the cycle canceling algorithm to compute minimum cost assignment as we do for active tasks. We can reuse the configuration for active tasks to control if we merely compute the min-cost assignment or we also balance tasks for the same sub-topology.

4. Public Interfaces

There are no changes in public interfaces.will be a few new public configs added:

  • rack.aware.assignment.strategy: this config controls if we should use 4.B.I or 4.B.II to compute min-cost with or without balanced sub-topology
  • rack.aware.assignment.enabled: this config controls if we should enable rack aware assignment
  • rack.aware.assignment.traffic_cost: this config controls the cost we add to cross rack traffic
  • rack.aware.assignment.non_overlap_cost: this config controls the cost we add to non overlap assignment with targeted assignment computed by HAAssignor

Note client.rack consumer config needs to be configured by customer to enable rack aware assignment.

5. Compatibility, Deprecation, and Migration Plan

We will implement the new rack aware assignment logic in HighAvailabilityTaskAssignor  and use the new assignor only when client rack information appears in Subscriptions and at least one of the TopicPartitions in some tasks have different cost compared to other TopicPartitions. This is because if all of the have the same cost, there's no point to use rack aware assignment logic to compute min cost since assigning tasks to any clients doesn't make a difference. If users want to use rack aware assignment, they need to upgrade Kafka Streams to at least the earliest supported version. It's better to stop all the instances and start them all with the latest version with client.rack  config, but it's also OK to do a rolling bounce with the new version. Rolling bounce with new version may cause the assignment changing between rack aware assignment and no rack aware assignment but eventually the assignment will be computed using rack awareness after every instance uses the new version.

6. Test Plan

  • Unit test will be added for min-cost flow graph algorithm and rack aware assignment
  • Existing integration test will be updated to ensure it works with or without rack configuration as well as with both client.rack  and rack.aware.assignment.tags  configurations
  • New integration test will be added to verify rack aware assignment
  • Existing system test will be used to verify compatibility with old version
  • TaskAssignorConvergenceTest will be updated to verify the new assignor will converge and it produces no worse assignment in terms of cross AZ cost
  • Performance test will be done manually to verify the efficiency of min-cost flow computation with large number of tasks and clients as well as different number of subscribed topic partitions

7. Rejected Alternatives

No rejected alternatives yet.