You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Motivation

Kafka TopicPartitions  are replicated on the broker side to different racks. KIP-881 added rack information of clients in partition assignment interface so that the assignment logic can utilize this information to try to assign TopicPartitions  to the clients in the same rack as much as possible. The reason why we would like to place TopicPartitions  to clients in the same rack is because this would avoid cross rack traffic (CRT) which has higher costs in terms of latency and money.

The client rack awareness additions in partition assignments also affects how streams can assign its tasks to clients so that cross rack traffic can be avoided as much as possible if partition replicas’ racks won’t always be presented in the client's rack. This design discusses how we can augment current assignment logic in Kafka Streams to utilize rack information to minimize cross rack traffic with other constraints.

Current Task Assignment Logic

There are two assignors implemented in Kafka Streams now: StickyTaskAssignor and HighAvailabilityTaskAssignor. HighAvailabilityTaskAssignor was created for KIP-441 and it’s the currently default assignor configured in Kafka Streams. As a result, this design will only augment HighAvailabilityTaskAssignor to account for the rack awareness assignment. StickyTaskAssignor  users who would like to use rack awareness assignment should upgrade their Kafka Streams version to the version in which HighAvailabilityTaskAssignor and rack awareness assignment are available.

How HighAvailabilityTaskAssignor (HAAssignor) works

  1. Assign active tasks to clients in a round-robin manner. Tasks and clients are both sorted when this assignment is performed so that this assignment is deterministic
  2. Balance active tasks among clients so that the task load in clients is balanced. (taskLoad = (activeTaskCount + standbyTaskCount) / threadsNum)
  3. Assign standby tasks to clients using ClientTagAwareStandbyTaskAssignor  or DefaultStandbyTaskAssignor
  4. Balance standby tasks among clients based on task load
  5. Move active tasks around and create warm-up tasks if there are more caught-up clients (If a client previously has this task’s standby or active)
  6. Move standby tasks around if there are more caught-up clients
  7. Assign stateless tasks
  8. Return current assignment and if there are tasks moved around in step 5 or 6, schedule a probing rebalance after some timeout hoping the stable assignment in step 1 and 3 can be achieved later

The main advantage of HAAsssignor  is that it takes caught-up information on clients into account and will try to assign tasks to more caught-up clients so that those tasks can be run first. At the same time, the original assigned client will try to warm up its state and try to take over the task in following probing rebalances.

Design for rack aware assignment

There are several constraints for rack aware assignment:

  1. Assignment should still balance tasks over clients based on task load even if this can cause higher cross rack traffic.
  2. Rack aware assignment should be computed in step 1 - 2 for active tasks and step 3 - 4 for standby tasks before we consider the caught-up information on clients to move tasks around in step 5 - 6. The reason is that the assignment computed in step 5 - 6 is temporary for task warmup. We should aim for minimizing task downtime over reduce CRT for this temporary assignment.
  3. The computed assignment should be deterministic which means that subsequent probing rebalancing can stop eventually if warm-up tasks are warmed up

Now suppose we have a set of tasks T = {t1, t2 … tn} and a set of clients C = {c1, c2 … cm}. We define the cost of assigning task ti to client cj as following:

int getCost(Task t, Client c) {
    int cost = 0;
    for TopicPartion tp : t.topicPartitions() {
        if (tp has no replica in same rack as c) {
            cost++;
        }
    }
} 

Then our problem becomes to find out a mapping M: T -> C so that ∑Cost[i][j] is minimized (for i in T and j in C) with the constraint that the mapping M should be as balanced as possible. For standby tasks, there should be other constraints such as a standby task shouldn’t be assigned to a client which has the same active task.

An ideal solution would be that for active, standby and stateless tasks, we can find an assignment for them which minimizes the total cost while satisfying all the constraints. However, I couldn’t find an efficient polynomial-time global optimization algorithm to do this :( As a result, I propose the local optimizations that we still follow the structure of HAAssignor: first, we compute min cost assignment for active tasks. Then we compute it for standby tasks and lastly we compute assignments for stateless tasks. 

Min-cost flow problem

The min-cost flow problem is defined as follows: given a directed graph, each edge has a capacity constraint which limits how much traffic it allows. Each edge also has a cost for each traffic unit. Each node could have a supply or demand of certain traffic. The problem is to find a flow of traffic so that the total cost along the edges can be minimized.

Our assignment problem can be translated to a min-cost flow problem as below.

Tasks and Clients are graph nodes. Every task is connected to every client by an edge with capacity 1. The cost of an edge is the cost of cross rack traffic between the task and the client. If a task is assigned to a client, then there’s one flow between them and vice versa. Then our assignment problem is translated to find the min-cost flow in our graph with the constraint that the flow should be balanced between clients.

Algorithm

Klein’s cycle canceling algorithm can solve the min-cost flow problem in O(E^2CU) time where C is max cost and U is max capacity. In our particular case, C is 1 and U is at most 3 (A task can have at most 3 topics including changelog topic?). So the algorithm runs in O(E^2) time for our case. Below is a rough sketch of the algorithm:

find a feasible flow

while there's negative cylces in the residual graph:

    find a negative cycle

    update the flow with negative cycle's min capacity

Negative cycles and residual graphs can be read here. Negative cycles can be found using Bellman-ford’s algorithm. How to find a feasible flow? It can be found by using step 1 - 2 of HAAssignor . What’s even nice about the cycle canceling algorithm is that since our graph is a bipartite graph, every time we find a negative cycle, the client nodes in the cycle satisfies that if there’s an in edge, there must be an out edge which means the flow of the clients won’t be changed. As a result, each round of the cycle canceling will reduce the cost and won’t change the total assignments of a client. So eventually, when there’s no more negative cycles, we find a min-cost balanced assignment. Below is an example of finding an min cost of 2 to assign task t1, t2  to clients c1, c2 .


Rack awareness assignment algorithm for active tasks

Min cost with unbalanced sub-topology

1. Compute cost cross rack cost of each task/client pair
2. Find an assignment following step 1 - 2 of HAAssignor
Convert assignment to residual graph: if there's assignment, change capacity to 0 and flow to 1, if there's no assignment, change flow to 0 and capacity to 1. Edge cost is task/client cost
3. while there's negative cycle in current graph:
       find negative cycle, update flow
4. Get assignment from final flow. If there's an flow in the edge, the two nodes of the edge which are task and client have assignment relationship.

As analyzed, the result assignment will make each client have the same load as what’s computed by step 1 - 2 from HAAssignor. So the assignment is balanced with minimal cost. Also the computation is deterministic as all the steps in this algorithm are deterministic.

One caveat is that while the load on each client is similar, this algorithm doesn’t guarantee that different partitions of different sub-topology will be assigned to different clients as much as possible.

Min cost with balanced sub-topology

Above algorithm doesn't balance tasks for same sub-topology which may lead to processing skewness. We could try to balance it by constructing a more complex graph by limiting how many tasks of a sub-topology we can assign to a client.

1. follow HAAssignor's step 1 - 2 to get a balance assignment for each client.
2. Suppose all the clients have n tasks in total, client i have C[i] tasks assigned which is balanced assignment, subtopology j have S[j] tasks (partitions). Now we can limit the number of tasks subtopology j can assign to client i to ceiling(S[j] * C[i] / n).
3. With above limitation, we construct a min-cost flow graph and compute the min-cost with balanced sub-topology assignment as well.

Graph construction

  1. For each sub-topology, do this:
    1. Create new set of nodes which has same number as clients
    2. Add an edge between task node and each new client nodes with capacity 1 and cost as CRT cost
    3. Add an edge between each new client node and corresponding original client nodes with capacity as the limit of tasks this sub-topology can assign to the client. The cost of the edge is 0
  2. Add a sink node connecting all client nodes with capacity as what we computed in HAAssignor’s step 1 - 2 and cost 0

Solution

  1. Given the above graph, we could use the max-flow algorithm to find a solution first (Why must a solution exist? Intuitively, we if we need to assign C[i] tasks to Client i and there are totally N tasks, for sub-topology [j], we need to assign S[j] * C[i] / N. If we take the ceiling of this and denote this as the max this sub-topology can donate to this client, within this bound, we can find a solution. But how to prove this mathematically?)
  2. Then we apply cycle canceling algorithm to minimize the cost
  3. When the algorithm finishes, we have a solution which minimizes the CRT cost while limits the number of tasks each sub-topology can assign to each client (Balance tasks of same sub-topology)

Example

Suppose there are 3 clients C1, C2 and C3 with 1, 2 and 3 threads respectively. There are 6 tasks total with 2 sub-topologies S1 and S2. So each sub-topology has 3 tasks. It’s easy to see C1, C2 and C3 should have 1, 2 and 3 tasks assigned to each respectively since this will make them have equal load of 1. Then the limit of tasks within a sub-topology for each client would be ceiling of 3*⅙, 3*2/6 and 3*3/6 which are 1, 1, and 2. Then our graph can be constructed as

Based on the cost information, one possible solution could be {s1_1}, {s1_2, s2_1}, {s1_3, s2_2, s2_3}. Note this solution could satisfy all capacity requirements and if this isn’t the optimal solution, cycle canceling would find us one which would also satisfy the capacity requirements which balances sub-topologies. 

One drawback of this solution is that the final solution is based on the number of tasks which can be assigned to each client computed from HAAssignor’s step 1 - 2. If every client has an equal number of threads but the number of tasks couldn’t be divided equally, then there’s one client with a different number of tasks. Ideally, this client could be anyone when we find our min cost assignment, but we fix it during our computation, which may result in a slight sub-optimal solution.

Rack awareness assignment for standby tasks

Assigning standby tasks comes with even more constraints:

  1. Standby task shouldn’t be assigned to client which already have the active task
  2. Ideally, standby tasks should be in different rack from corresponding active tasks
  3. Ideally, standby tasks themselves should be in different racks
  4. Ideally, cross rack traffic cost should be minimized
  5. Assignment should be deterministic

There are several options to balance between reliability and efficiency for standby task assignment.

Balance reliability over cost

In this case, we should first optimize constraints 1 - 3. After we get an assignment from ClientTagAwareStandbyTaskAssignor for example, we use two loops to check if we could swap any standby task assignment between different clients which can result in smaller traffic cost. This is a greedy algorithm which may not give the optimal solution.

for (ClientState client : ClientStates) {
    for (Task standby task : client.standbyTasks) {
        for (ClientState otherClient : ClientState) {
            if (client.equals(otherClient) || otherClient.contains(task)) {
                continue;
            }
            for (Task otherStandbyTask : otherClient.standbyTask) {
                if (swap task and otherStandbyTask is feasible and have smaller cost) {
                    swap(standbyTask, otherStandbyTask);
                }
            }
      }
}

Prefer reliability and then find optimal cost

We could also first find a more reliable standby assignment and then if we have a certain number of reliable standby assignments. We switch to prefer to minimize cost. For example, if we would like at least 1 standby to be in a different rack from the active, when we assign the first standby, we can make clients which are in a different rack from the active client have less cost. So we can adjust the cost in following way:

task/client with different rack from active will always have lower cost than task/client with same rack from active
if they have same rack, whichever have lower CRT will have lower cost


We can run the min-cost flow algorithm to assign a number of standby until we feel we can prefer low CRT now. Then we can adjust the cost in following way:

task/client with lower CRT will have lower cost
if they have same CRT cost, whichever in different rack from active or other standby tasks will have lower cost

Option II is more configurable and may result in lower CRT cost. But it’s more complicated.

What if both rack information and rack aware tags present

KIP-708: Rack awareness for Kafka Streams add rack awareness for Kafka Streams. If both rack aware tags and rack information in subscription are present, we could default to one. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys.

Assignment for stateless tasks

We can use a min-cost flow algorithm to assign stateless tasks. First, we can find a feasible solution based on task load. Then we can use the cycle canceling algorithm to compute minimum cost assignment as we do for active tasks.

Public Interfaces

There are no changes in public interfaces.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels