Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. For every consumer: before sending the join-group request, do NOT revoke any partitions; instead just encode the current assigned partitions as part of the Subscription.
  2. For every consumerthe leader: after getting the received the sync-group requestsubscription topics, as well as the assigned-partitions, do the following:
    1. Call the registered assignor of the selected protocol to generate the assignment; let's call it newly-assigned-partitions.
    2. Segment the total-partitions set of partitions inferred from the newly-assigned-partitions into two exclusive sub-sets: Intersection(total-partitions, assigned-partitions), and Minus(total-partitions, assigned-partitions).
      Note that the latter is possibly non-empty because a partition maybe revoked in previous rebalance and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call the former prev-assigned-partitions and the latter not-assigned-partitions.
    3. For not-assigned-partitions, we can encode the owner from the newly-assigned-partitions directly since we know no one owns it before, either due to revocation or due to newly created partitions.
    4. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner.
  3. For every consumer: after received the sync-group request, do the following:
    1. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
    2. Check the error code as well, and depending on it move forward to c) below, or trigger a rebalance immediately (for incompatible members, see below), or fail immediately if it is fatal.
    3. Update the newly assigned-partitions, and for those newly added  partitions, call the rebalance-listener — this is the same as the current logic.
    4. If revoked partitions is not empty, remove those partitions by calling the rebalance-listener. And then immediately send another join group request with the updated assigned partitions
  4. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
  5. Check the error code as well, and depending on it move forward to c) below, or trigger a rebalance immediately (for incompatible members, see below), or fail immediately if it is fatal.
  6. Update the newly assigned-partitions, and for those newly added  partitions, call the rebalance-listener — this is the same as the current logic.
  7. If revoked partitions is not empty, remove those partitions by calling the rebalance-listener. And then immediately send another join group request with the updated assigned partitions.
  8. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Call the registered assignor of the selected protocol to generate the assignment; let's call it newly-assigned-partitions.
    2. Segment the total-partitions set of partitions inferred from the newly-assigned-partitions into two exclusive sub-sets: Intersection(total-partitions, assigned-partitions), and Minus(total-partitions, assigned-partitions).
      Note that the latter is possibly non-empty because a partition maybe revoked in previous rebalance and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call the former prev-assigned-partitions and the latter not-assigned-partitions.
    3. For not-assigned-partitions, we can encode the owner from the newly-assigned-partitions directly since we know no one owns it before, either due to revocation or due to newly created partitions.
    4. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner.


No changes required from the broker side, since this logic change is completely wrapped inside the consumer protocol / coordinator implementation itself, and to brokers it is just the same as previous version's rebalances.

...

To enable learner resource shuffling behavior, we need to have the following task status indicators to be provided:

Tag NameTask TypeExplanation
isStatefulbothIndicate whether given task has a state to restore.
isLearnerstandbyIndicate whether standby task is a learner task.
beingLearnedactiveIndicate whether active task is being learned by some other stream thread.
isReadystandbyIndicate whether standby task is ready to serve as active task.

Optimizations

Stateful vs Stateless Tasks

...


Also adding new configs for user to better apply and customize the scaling change.

stream.rebalancing.mode

Default: incremental

Version 1.0

The setting to help ensure no downtime upgrade of online application.

Options : upgrading, incremental


scale.down.timeout.ms

Default: infinity

Version 2.0

Time in milliseconds to force terminate the stream thread when informed to be scaled down.


learner.partial.rebalance

Default : true

Version 3.0

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the replaying. Otherwise, new stream thread will batch the ready call to ask for a single round of rebalance.


stream.imbalance.percentage

Default: 0.2 (20%)

Version 4.0

The tolerance of task imbalance factor between hosts to trigger rebalance.

Implementation Plan

To make sure the delivery is smooth with fundamental changes of KStream internals, we build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the algorithm requirements are highly coupled with internal architecture reasoning.

...