Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. For the set of stateless tasks:
    1. First calculate the average number of tasks each thread should get on average.
    2. For each task (sorted by topic-groupId), if there is an owner of this task from prevTask (no more than one client should be claiming to own it as the owner) who's not exceeding the average number, assign to it;
    3. Otherwise, find the host with the largest remaining capacity (defined as the diff between the average number and the number of current assigned tasks) and assign to it.
  2. For the set of stateful tasks, first consider the active tasks assignment:
    1. First calculate the average number of active-tasks each thread should get on average (so yes, we are still treating all the stateful tasks equally, and no we are not going to resolve KAFKA-4969 in this KIP).
    2. For each task (sorted by topic-groupId):
      1. Find the host with the smallest gap, if if its not exceeding the average number, assign to it;
      2. Otherwise, if there's no hosts who has it before, there is nothing we can do but bite the bullet of restoration-gap, and we can just pick the client with largest remaining capacity and assign to it;
      3. Otherwise, it means that we have at lease least one prev-task owner but just the one with smallest gap already exceeded its capacity.  
    3. First consider the host for its active task: choose the 
  3. we calculate the average num.tasks each host should get on average as its "capacity", by dividing the total number of num.tasks to the total number of consumers (i.e. num.threads) and then multiple by the number of consumers that host has.
  4. Then for each task:
    1. If it has a client who owns it as its PrevTask, and that client still have capacity assign to it;
    2. Otherwise if it has a client who owns it as its StandbyTask, and that client still have capacity assign to it;
  5. If there are still unassigned tasks after step 2), then we loop over them at the per-sub-topology granularity (for workload balance), and again for each task:
    1. Find the client with the least load, and if there are multiple ones, prefer the one previously owns it, over the one previously owns it as standbyTask, over the one who does not own it at all.
      1. We need to make a call here on the trade-off of workload imbalance v.s. restoration gap (some heuristics applicable in the first version)
        1. If we favor reducing restoration latency, we will still assign the task to the host with smallest gap, but if the standby task number N (used below in step 3) == 0, we'd force assign a standby task to the new owner candidate – otherwise we do nothing but just rely on step 3) to get us some standby tasks.
        2. Otherwise, we will assign the task to other host following the same logic of 2.b.i) above, but starting with the second smallest gap.
  6. Then we consider the standby assignment for stateful tasks (assuming num.replicas = N)
    1. First calculate the average number of standby tasks each thread should get on average.
    2. For each task(sorted by topic-groupId), ranging i from 1 to N:
      1. Find the i-th host with the smallest gap excluding the active owner and 1..(i-1)th standby owners, if its not exceeding the average number, assign to it;
      2. Otherwise, go to the next one with the smallest gap, and go back go 3.b.i) above, until we found no hosts left who has it before, we can just pick the client with largest remaining capacity and assign to it.
      3. If we run out of hosts before i == N it means we have assigned a standby task to each host, i.e. N > num.hosts, we will throw exception and fail.
  7. Note since the tasks are all sorted on topic-groupId, e.g. 1-1, 1-2, 1-3, ... 2-3 we are effectively trying to get per-sub-topology workload balance already. Also in the tie-breakers of step 1.c, 2.b.ii), and 2.b.ii) above, we will define it as the one who has the smallest number of tasks assigned to it from the same topic-groupId to further achieve per-sub-topology workload balance in a best effort.
  8. And whenever we've decided to favor reducing restoration latency in 2.b.iii.1) step above, we have introduced workload imbalance, and we'd want to get out of this state, by re-trigger a rebalance later so that the assignor can check if some standby owner can now take over the task. To do that, we will add a new type of error code named "imbalanced-assignment" in the ErrorCode field if the assignmentInfo, and when 2.b.iii.1) happens we will set this error code to all the members who own a standby task for the one triggered 2.b.iii.1) – there must be at least one of them. And upon receiving this error code, the thread will keep track of the progress of all its owned standby tasks, and then trigger another rebalance when the gap on all of them are close to zero.


NOTE the step 5) above indeed lost the specific information that which task should be on "watching-list", and hence the thread just need to watch all its standby tasks. We can, of course, inject new fields into the AssignmentInfo encoding to explicitly add those "watch-list" standby tasks. Personally I'm a bit reluctant to add them since they seem to be too specific and will make the streams assignor protocol not generalizable enough, but I can be convinced if there's strong motivations for the latter approach.

Please also compare this idea with the original algorithm below in "Assignment Algorithm" and let me know your thoughts.


----------------------------------------------------------------------------------------------------------------------------

OLD VERSION OF THE KIP, YET TO BE CLEANED UPAs one can see, we honor stickiness (step 2) over workload balance (step 3).

Terminology

we shall define several terms for easy walkthrough of the algorithm.

...