Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP is following on KIP-429 to improve Streams scaling out behavior.

Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415. There are already exciting discussions around it, but for Kafka Streams, the delayed rebalance is not the complete solution. This KIP is trying to customize the cooperative rebalancing approach specifically for KStream application context,  based on the great design for Connect and consumer.

While KIP-429 is focused on decreasing the amount of time that progress is blocked on the rebalance itself, this KIP addresses a second phase of stoppage that happens today: upon receiving assignment of a stateful task, Streams has to catch it up to the head of its changelog before beginning to process it.

Currently, the Streams task assignment doesn't take into account how much work it will take to catch up a task, which leads to unfortunate cases in which, after a rebalance, some task can get hung up for hours while it rebuilds its state stores. The high level goal of this KIP is to allow the prior owner of that task to keep it even if the assignment is now unbalanced  until the new owner gets caught up, and then to change ownership after the catch-up phaseCurrently Kafka Streams uses consumer membership protocol to coordinate the stream task assignment. When we scale up the stream application, KStream group will attempt to revoke active tasks and let the newly spined hosts take over them. New hosts need to restore assigned tasks' state before transiting to "running". For state heavy application, it is not ideal to give up the tasks immediately once the new player joins the party, instead we should buffer some time to let the new player accept a fair amount of restoring tasks, and finish state reconstruction first before officially taking over the active tasks. Ideally, we could realize no downtime transition during cluster scaling.

In short, the goals of this KIP are:

...

  1. Join group with all currently assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks and resume working.

  3. Replay the assigned tasks state.' states

  4. Once all replay jobs finish, stream thread transits to running mode

After KIP-429, this changes to:

  1. Joined the group, optimistically keeping all assigned tasks
  2. Wait until group assignment finish to get tasks to revoke
  3. Once the revocations are processed, get assigned tasks
  4. Replay the assigned tasks' states
  5. Once all replay jobs finish, stream thread transits to running mode
  6. .

If you want to know more about details on protocol level, feel free to checkout KIP-429.

Constraints and Cost Function

While doing the assignment, we observe the following constraint:

  • Only one copy of a particular task can be hosted on a particular instance. This means, a particular task, like `0_8` would have an "active" (i.e., primary) copy and some number of "standby" copies (i.e., replicas). A single instance  (not thread) can only host one of those, either the active task or one  of the standby tasks. We can't assign both an active and standby task with the same taskId to the same instance.

We also try to minimize the cost of the following cost function:

  • data parallel workload balance. We make an assumption that all the partitions of a subtopology are roughly uniform effort. Therefore, we try to spread partitions across the cluster. For example, given two subtopologies X and Y, each with three partitions 0, 1, and 2, and three nodes A, B, and C, we'd prefer to allocate the tasks like (A: <X_0, Y_0>, B: <X_1, Y_1>, C: <X_2, Y_2>) instead of (eg) (A: <X_0, X_1>, B: <X_2, Y_0>, C: <Y_1, Y_2>).
  • even distribution of work. We try to distribute tasks evenly across the cluster, instead of letting one thread have more tasks than another.
  • replay time. We try to minimize the time it takes to replay a newly assigned task, which blocks further progress on that task. Note, although this is the focus of this KIP, this consideration was already part of Streams's assignment algorithm, as it's the reason we currently favor stickiness.

Note, this is a cost minimization problem, since it may not be possible to fully satisfy all three components of the cost function.

Streams Rebalance Metadata: Remember the PrevTasks

...