Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka TopicPartitions  are replicated on the broker side to different racks. KIP-881: Rack-aware Partition Assignment for Kafka Consumers added rack information of clients in partition assignment interface so that the assignment logic can utilize this information to try to assign TopicPartitions  to the clients in the same rack as much as possible . The reason why we would like to place TopicPartitions  to clients in the same rack is because this would so that it would avoid cross rack traffic (CRT) which has higher costs in terms of latency and money.

The client rack awareness additions in partition assignments also affects how streams Kafka Streams can assign its tasks to clients so that cross rack traffic can be avoided as much as possible if partition replicas’ racks won’t always be presented in the client's rack. This design discusses how we can augment current assignment logic in Kafka Streams to utilize rack information to minimize cross rack traffic with other constraints.

...

There are two assignors implemented in Kafka Streams now: StickyTaskAssignor and HighAvailabilityTaskAssignor. HighAvailabilityTaskAssignor was created for KIP-441: Smooth Scaling Out for Kafka Streams and it’s the currently default assignor configured in Kafka Streams. As a result, this design will only augment HighAvailabilityTaskAssignor to account for the rack awareness aware assignment. StickyTaskAssignor  users who would like to use rack awareness aware assignment should upgrade their Kafka Streams version to the version in which HighAvailabilityTaskAssignor and rack awareness assignment are available.

...

  1. Assignment should still balance tasks over clients based on task load even if this can cause higher cross rack traffic.
  2. Rack aware assignment should be computed in step 1 - 2 for active tasks and step 3 - 4 for standby tasks before we consider the caught-up information on clients to move tasks around in step 5 - 6. The reason is that the assignment computed in step 5 - 6 is temporary for task warmup. We should aim for minimizing task downtime over reduce CRT for this temporary assignment.
  3. The computed assignment should be deterministic must converge which means that subsequent probing rebalancing can stop eventually if warm-up tasks are warmed up

...

An ideal solution would be that for active, standby and stateless tasks, we can find an assignment for them which minimizes the total cost while satisfying all the constraints. However, I couldn’t find an efficient polynomial-time global optimization algorithm to do this :( . As a result, I propose the local optimizations that we still follow the structure of HAAssignor: first, we compute min cost assignment for active tasks. Then we compute it for standby tasks and lastly we compute assignments for stateless tasks. 

...

The min-cost flow problem is defined as follows: given a directed graph, each edge has a capacity constraint which limits how much traffic it allows. Each edge also has a cost for each traffic unit. Each node could have a supply or demand of certain traffic. The problem is to find a max flow of traffic so that in which the total cost along the edges can be minimized.

Our assignment problem can be translated to a min-cost max flow problem as below.

Tasks and Clients are graph nodes. Every task is connected to every client by an edge with capacity 1. The cost of an edge is the cost of cross rack traffic between the task and the client. If a task is assigned to a client, then there’s one flow between them and vice versa. Then our assignment problem is translated to find the min-cost max flow in our graph with the constraint that the flow should be balanced between clients.

...

While the min-cost flow algorithm can find us assignments with min CRT cost, it’s also possible that if a TopicPartition’s TopicPartition ’s rack changes, the newly computed assignment could drastically change which can cause task shuffling and new warmups which involves more network and computation cost. So it’s preferable to make the new assignment as close to the old assignment as possible. 

...

Assigning standby tasks comes with even more constraints or preference:

  1. Standby task shouldn’t be assigned to client which already have the active task
  2. Ideally, standby tasks should be in different rack from corresponding active tasks
  3. Ideally, standby tasks themselves should be in different racks
  4. Ideally, cross rack traffic cost should be minimized
  5. Assignment should be deterministic

...

Code Block
int getCost(Task t, Client c) {
    final int TRAFFIC_WEIGHT = 1;
    final int SAME_RACK_WEIGHT = 10;
    int cost = 0;
    
    // add weights for cross-AZ traffic
    for TopicPartion tp : t.topicPartitions() {
        if (tp has no replica in same rack as c) {
            cost += TRAFFIC_WEIGHT;
        }
    }
    
    // add weights for assignment with different racks
    for (Client c1 which has t assigned) {
        if (c is in same rack as other clients which has t) {
            cost += SAME_RACK_WEIGHT;
        }
    }
} 

After we assign the first standby to different racks as much as possible, we adjust the cost function to give more weight for TRAFFIC_WEIGHT  so that we prefer low traffic assignments.

Option II is more configurable and may result in lower CRT cost. But it’s more complicated.

...

In summary, we can also add configs a configuration such as internal.rack.aware.assignment.standby.strategry  with value greedy  and balance  to choose between algorithm I and algorithm II. We can make algorithm II as default.

What if both rack information and rack aware tags are present

KIP-708: Rack awareness for Kafka Streams added rack awareness configurations for Kafka Streams with rack.aware.assignment.tags and  and client.tag .  The goal is to use the tags to identify locations of clients and try to put tasks in clients of different locations as much as possible. This configuration could possibly be a superset of clients’ rack configuration in consumer which is client.rack . There are several options on how to deal with the situation that assign standby tasks when both configurations are present.

...

Choose a default one

If both rack aware tags and client rack information in subscription are present, we could default to use one for standby assignment. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys. So we default to rack aware tags.

...

Enforce client.rack information must be present in rack.aware.assignment.tags

For example, if a client configures consumer rack information as client.rack: az1 . Then rack aware tags must contain rack.aware.assignment.tags: rack and  and client.tag.rack: az1 . We can fail the assignment if above is not satisfied.

...

Check client.rack information must be present in rack.aware.assignment.tags but don’t enforce it

Option II is too strict. We could just check and warn the users in logs if both client.rack and  and rack.aware.assignment.tags are    are configured but rack doesn’t exist in rack.aware.assignment.tags or  or client.tag.rack doesn’t  doesn’t have the same value as client.rack . However, we continue processing the assignment using option I by default defaulting to one of the configurations.

...

Summary

I think option III is good enough for us and we can default to rack.aware.assignment.tags configuration  configuration since it can contain more information than client.rack .

Assignment for stateless tasks

...