Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka TopicPartitions  are replicated on the broker side to different racks. KIP-881 added : Rack-aware Partition Assignment for Kafka Consumers added rack information of clients in partition assignment interface so that the assignment logic can utilize this information to try to assign TopicPartitions  to the clients in the same rack as much as possible. The reason why we would like to place TopicPartitions  to clients in the same rack is because this would avoid cross rack traffic (CRT) which has higher costs in terms of latency and money.

...

There are two assignors implemented in Kafka Streams now: StickyTaskAssignor and HighAvailabilityTaskAssignor. HighAvailabilityTaskAssignor was created for KIP-441 and : Smooth Scaling Out for Kafka Streams and it’s the currently default assignor configured in Kafka Streams. As a result, this design will only augment HighAvailabilityTaskAssignor to account for the rack awareness assignment. StickyTaskAssignor  users who would like to use rack awareness assignment should upgrade their Kafka Streams version to the version in which HighAvailabilityTaskAssignor and rack awareness assignment are available.

...

Code Block
int getCost(Task t, Client c) {
    final int TRAFFIC_WEIGHT = 210;
    final int OVERLAP_WEIGHT = 1;
    int cost = 0;
    
    // add weights for cross-AZ traffic
    for TopicPartion tp : t.topicPartitions() {
        if (tp has no replica in same rack as c) {
            cost += TRAFFIC_WEIGHT;
        }
    }
    
    // add weights for overlap with current assignment
    if (c is NOT currently target assignment) {
        cost += OVERLAP_WEIGHT;
    }
} 

TRAFFIC_WEIGHT and OVERLAP_WEIGHT  indicate how much we favor minimizing traffic cost compared to maximizing overlap. In the extreme case, we can put OVERLAP_WEIGHT to 0 which means we always compute the lowest CRT assignment. We can expose these weights as internal configs. Note that we don't want to put a very high value for the weight because the cost upper bound appears in the algorithm's time complexity O(E^2 * (CU)) .

Summary

In summary, for active task assignment, we can add internal configs to choose algorithm I or algorithm such as internal.rack.aware.assignment.strategry  with value min_cost  or balanced_min_cost  to choose algorithm I or algorithm II. We can make algorithm I as default to minimize total CRT cost. For cost computation, we can use Option 2 in III and expose configs to adjust the weight such as internal.rack.aware.assignment.traffic_weight  and internal.rack.aware.assignment.overlap_weight .

Rack awareness assignment for standby tasks

...

KIP-708: Rack awareness for Kafka Streams add added rack awareness configurations for Kafka Streams with rack. If both rack aware.assignment.tags and client.tag.  The goal is to use the tags to identify locations of clients and try to put tasks in clients of different locations as much as possible. This configuration could possibly be a superset of clients’ rack configuration in consumer which is client.rack. There are several options on how to deal with the situation that both configurations are present.

I. Option 1

If both rack aware tags and rack information in subscription are present, we could default to one. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information tags and rack information in subscription are present, we could default to one. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys.

Assignment for stateless tasks

We can use a min-cost flow algorithm to assign stateless tasks. First, we can find a feasible solution based on task load. Then we can use the cycle canceling algorithm to compute minimum cost assignment as we do for active tasks.

Public Interfaces

There are no changes in public interfaces.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

So we default to rack aware tags.

II. Enforce client.rack information must be present in rack.aware.assignment.tags

For example, if a client configures consumer rack information as client.rack: az1. Then rack aware tags must contain rack.aware.assignment.tags: rack and client.tag.rack: az1. We can fail the assignment if above is not satisfied.

III. Check client.rack information must be present in rack.aware.assignment.tags but don’t enforce it

Option II is too strict. We could just check and warn the users in logs if both client.rack and rack.aware.assignment.tags are configured but rack doesn’t exist in rack.aware.assignment.tags or client.tag.rack doesn’t have the same value as client.rack. However, we continue processing the assignment using option I by default to one of the configurations.

IV. Summary

I think option III is good enough for us and we can default to rack.aware.assignment.tags configuration since it can contain more information.

Assignment for stateless tasks

We can use a min-cost flow algorithm to assign stateless tasks. First, we can find a feasible solution based on task load. Then we can use the cycle canceling algorithm to compute minimum cost assignment as we do for active tasks. We can reuse the configuration for active tasks to control if we merely compute the min-cost assignment or we also balance tasks for the same sub-topology.

Public Interfaces

There are no changes in public interfaces.

Compatibility, Deprecation, and Migration Plan

We will implement the new rack aware assignment logic in HighAvailabilityTaskAssignor  and use the new assignor only when client rack information appears in Subscriptions and at least one of the TopicPartitions in some tasks have different cost compared to other TopicPartitions. This is because if all of the have the same cost, there's no point to use rack aware assignment logic to compute min cost since assigning tasks to any clients doesn't make a difference. If users want to use rack aware assignment, they need to upgrade Kafka Streams to at least the earliest supported version. It's better to stop all the instances and start them all with the latest version with client.rack  config, but it's also OK to do a rolling bounce with the new version. Rolling bounce with new version may cause the assignment changing between rack aware assignment and no rack aware assignment but eventually the assignment will be computed using rack awareness after every instance uses the new version.

Test Plan

  • Unit test will be added for min-cost flow graph algorithm and rack aware assignment
  • Existing integration test will be updated to ensure it works with or without rack configuration as well as with both client.rack  and rack.aware.assignment.tags  configurations
  • New integration test will be added to verify rack aware assignment
  • Existing system test will be used to verify compatibility with old version
  • Performance test will be done manually to verify the efficiency of min-cost flow computation with large number of tasks and clients

Rejected Alternatives

No rejected alternatives yetIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.