Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:  

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-8019 or key = KAFKA-7149 or key = KAFKA-6145 or key = KAFKA-6039 4696
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

...

We would omit the common scenarios description here since it is already covered in KIP-415, which is very similar to this KIP with the difference of the scheduledDelay above.


NOTE that for this new algorithm to be effective in reducing rebalance costs, it is really expecting the plug-in assignor to be "sticky" in some way, such that the diff of the newly-assigned-partitions and the existing-assigned-partitions can be small, and hence only a few subset of the total number of partitions need to be revoked / migrated at each rebalance in practice – otherwise, we are just paying more rebalance for little benefits. We will talk about how sticky StreamsAssignor would be updated accordingly in Part II.


Compatibility and Upgrade Path

...

Now the second part of this KIP is on Streams' PartitionAssginor implementation on top of the consumer rebalance protocol. The main goal is to tweak the stickiness / workload balance trade-off so that we can reduce the stateful task's restoration time (hence the unavailability gap) when migrating it to a "new host" (here we use this term for a host who do not have previously restored state for this task, and hence need to restore from scratch, or even more generally speaking, a host whose local maintained state if far behind the actual state's progress, and hence would need to restore for a long time) from an existing active hostRemember the difference between eager and (new) cooperative consumer rebalance protocol is that: in the "eager" mode, we always revoke everything before joining the group, in "cooperative" mode, we always revoke nothing before joining the group, but we may revoke some partitions after joining the group as indicated by the leader. Native consumer assignor would immediately let consumer members to revoke the partition immediately based on the Intersection(total-partitions, assigned-partitions).

In Streams however, we may want to defer the revocation as well if the intended new owner of the partition is "not ready", i.e. if the stateful task's restoration time (hence the unavailability gap) when migrating it to this new owner is long, since it does not have previously restored state for this task and hence need to restore from scratch. More generally speaking, we can extend this term to those hosts who may even have some local stores for the immigrating task, but is far behind the actual state's latest snapshot, and hence would still need to restore for a long time.


Streams SubscriptionInfo Update

The idea to resolve this, is to "delay" the revocation from the current owner to let the new owner first trying to close the gap of state update progress, and then revoke from the old owner and reassign to the new owner. However this cannot be easily done with a fixed "scheduled delay" since it really depends on the progress of the state store restoration on the new owner. To do that we need to let consumers report their current standby-tasks' "progress" when joining the group (some correlated information can be found at KAFKA-4696). More specifically, assuming that we've already done 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7149
 which will refactor the existing assignmentInfo format to the following to reduce the message size:

Code Block
AssignmentInfo (encoded in version 5) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => List<TaskId>
   PartitionsByHost        => Map<HostInfo, Set<TaskId>>
   ErrorCode               => Int32


We can refactor the subscriptionInfo format as well to encode the "progress" factor:

Code Block
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Map<TaskId, Int32>    // new change
   EndPoint                => HostInfo


More specifically, we will associate each standby task with an int32 value indicating its gap to the current active task's state snapshot. This gap is represented as the Sum(Diff(log_end_offset, restored_offset))_of_all_task_stores.

Also we will not distinguish between previous-active-tasks and previous-standby-tasks, since prev-active-tasks are just a special type of prev-tasks whose gap is zero. For tasks that are not in the prev-tasks list, it is indicating "I do not have this task's state at all, and hence the gap is simply the whole log".

For stateless tasks, there's no state in it and we will use a sentinel value (-1) to indicate its a stateless task in the prevTasks map. And only the host of the active task would include that in the prev-tasks map.

In addition, when Streams app is starting up, before joining the group it will also query the log-end-offset for all the local state stores in his state directory to calculate the gap; and after that the streams app can just maintain the gap dynamically for all its standby tasks (again, active tasks gap is just 0).


StreamsPartitionAssignor Logic Update

And then we will modify our sticky assignor logic. There are two things to keep in mind: 1) there's no semantic difference between prev-active and prev-standby stateful tasks any more, and 2) the assignor should be aware which tasks are stateful and which tasks are stateless, which can be easily inferred from its embedded topology builder. The goal is to assign the set of stateless and stateful tasks independently, trying to achieve workload balance while honoring stickiness (here the term "stickiness" would be interpreted based on the gap-value alone). And for stateless tasks, the assignor would not assign any standby tasks as well (KAFKA-4696).


More specifically:


  1. For the set of stateless tasks:
    1. First calculate the average number of tasks each thread should get on average.
    2. For each task, if there is an owner of this task from prevTask (no more than one client should be claiming to own it as the owner) who's not exceeding the average number, assign to it;
    3. Otherwise, find the host with the largest remaining capacity (defined as the diff between the average number and the number of current assigned tasks) and assign to it.
  2. For the set of stateful tasks, first consider the active tasks assignment:
    1. First calculate the average number of active-tasks each thread should get on average (so yes, we are still treating all the stateful tasks equally, and no we are not going to resolve KAFKA-4969 in this KIP).
    2. For each task:
      1. Find the host with the smallest gap, if if its not exceeding the average number, assign to it;
      2. Otherwise, if there's no hosts who has it before, there is nothing we can do but bite the bullet of restoration-gap, and we can just pick the client with largest remaining capacity and assign to it;
      3. Otherwise, it means that we have at lease one prev-task owner but just the one with smallest gap already exceeded its capacity. 
    3. First consider the host for its active task: choose the 
  3. we calculate the average num.tasks each host should get on average as its "capacity", by dividing the total number of num.tasks to the total number of consumers (i.e. num.threads) and then multiple by the number of consumers that host has.
  4. Then for each task:
    1. If it has a client who owns it as its PrevTask, and that client still have capacity assign to it;
    2. Otherwise if it has a client who owns it as its StandbyTask, and that client still have capacity assign to it;
  5. If there are still unassigned tasks after step 2), then we loop over them at the per-sub-topology granularity (for workload balance), and again for each task:
    1. Find the client with the least load, and if there are multiple ones, prefer the one previously owns it, over the one previously owns it as standbyTask, over the one who does not own it at all.

As one can see, we honor stickiness (step 2) over workload balance (step 3).


Terminology

we shall define several terms for easy walkthrough of the algorithm.

...