Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
    1. If subscription has changed, or topic metadata has changed: do NOT revoke any partitions or call rebalance-listener; instead just encode the current assigned partitions as part of the Subscription.
    2. If received REBALANCE_IN_PROGRESS from heartbeat response or commit response: same as a) above.
    3. If received MEMBER_ID_REQUIRED from join-group request: same as a) above.
    4. If received UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION from join-group / sync-group / commit / heartbeat response: reset generation / clear member-id correspondingly, "emigrate" the  the partitions with rebalance listener and then re-join group with empty assigned partition.
    5. If received assignment from previous rebalance's sync-group response contains error code NEEDS_REJOIN, revoke partitions as required before sending the join-group request with newly formed assigned partitions.
  2. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Call the registered assignor of the selected protocol to generate the assignment; let's call it newly-assigned-partitions.
    2. Take the cluster metadata and infer all the topic partitions that are targeted for assignment; let's call it total-partitions. Segment the total-partitions set of partitions inferred from the newly-assigned-partitions into two exclusive sub-sets:
      1. Intersection(total-partitions, assigned-partitions). These are partitions that are still owned by some members, and some of them may be now allocated for new members. Let's call it prev-assigned-partitions.
      2. Minus(total-partitions, assigned-partitions).

      Note that the latter is possibly non-empty because a partition maybe
      1. These are partitions that are not previously owned by any one. This set is non-empty when its previous owner is on older version and hence revoked them already before joining, or a partition is revoked in previous rebalance by the new versioned member and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call
      the former prev-assigned-partitions and the latter
      1. it not-assigned-partitions.
      For not-
      1. Minus(assigned-partitions,
      we can encode the owner from the newly-assigned-partitions directly since we know no one owns it before, either due to revocation or due to newly created partitions.
    3. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner, set the error-code to NEED_REJOIN for all the members
      1. total-partitions). These are partitions that are not known from the cluster metadata but are claimed to be owned by the members. It is non-empty if some topics are deleted, or if the leader's metadata is not refreshed, or if the previous leader has created some topics in its assignor (consider the Streams case). Let's call it unknown-partitions.

    4. For not-assigned-partitions, it is safe to move them to the new member immediately since we know no one owns it before, and hence we can encode the owner from the newly-assigned-partitions directly.
    5. For unknown-partitions, it is also safe to just do nothing but just give them back to whoever claimed to be their owners by encoding them directly as well. If this is due to topic metadata update them a later rebalance will be triggered.
    6. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner, set the error-code to NEED_REJOIN for all the members (this is for fast re-join and rebalance).
  3. For every consumer: after received the sync-group request, do the following:
    1. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance,
    For every consumer: after received the sync-group request, do the following:
    1. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
      If this is not true, it means either bugs or incompatible members are forming the group (for incompatible members, see below), and hence the consumer should log a fatal error and shutdown gracefully.
    2. Update the newly assigned-partitions, and for those newly added  partitions, call the rebalance-listener — this is the same as the current logic.
    3. If revoked partitions is not empty, remove those partitions by calling the rebalance-listener.
    4. Check the error code:
      1. If it is NONE, complete.
      2. If it is NEEDS_REJOIN, immediately send another join group request with the updated assigned partitions following step 1.e) above. 

...

When the config value is "eager", the consumer would still use V0 of the consumer protocol as well as the rebalance behavior; if the config value is "cooperative", the consumer will then use the new V1 protocol as well as the new algorithm. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of assignors to register multiple protocols and let leader to auto-switch to new versions, we need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).

We'd update the above algorithm on leader (i.e. bullet point 3) as, such that we will first check the versions of subscription of all the members:

  1. If all members are on V1, then follow the new algorithm.
  2. If all members are on V0, then follow the old algorithm.
  3. If there's no consensus, it means we are in a second rebalance then do the following:
    1. For those members in V1, send assignment in V1 as well by just giving back their existing assigned-partitions from the subscription metadata and leaving the revoked-partitions empty.
    2. For the rest of un-assigned partitions, we know that they were either newly created ones and hence has not been given to any host, or they are previously owned by those members in V0 and hence has been revoked before they join the group.
      We can then do the normal assignment based on those partitions by shrinking the cluster metadata that are passed to the assignor.

The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize V0, but due to compatibility would still be able to deserialize V1 protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group.

As for the upgrade path, we would require users to do two rolling bounces, where:

...


The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize V0, but due to compatibility would still be able to deserialize V1 protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group.

As for the upgrade path, we would require users to do two rolling bounces, where:

  1. In the first rolling bounce, keep the rebalance.protocol as "eager" (no need to manually change anything though since it is the default value).
  2. After the first rolling bounce is completely done. Then do a second rolling bounce in which rebalance.protocol is updated to "cooperative". The above logic will make sure that eventually when everyone's sending the join-group request with V1.

There's no changes needed on the V1's new assignment algorithm, since if user's are following the right protocol, then on the first rolling bounce everyone will still follow the rebalance protocol of V0. On the second rolling bounce, the leader should always be on the newer code, so it can recognize both V1 and V0 members (while some are not being bounced with the config change yet). It is safe to just follow the above algorithm since for V0 members, since they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, it is safe to move those partitions to other members immediately based on the assignor's output.


Note that this proposal depends on user's correct behavior that everyone should be on the same "rebalance protocol" eventually, otherwise the we would fall into the case 3.b) forever. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause undefined behavior since the old versioned leader may just proceed with the V0 "eager" assignment while some of the members are actually on V1.

...