Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
    1. If subscription has changed, or : revoke all partitions by calling onPartitionsRevoked, send join-group request with empty owned partitions in Subscription.
    2. If topic metadata has changed:  do NOT revoke any partitions or call rebalance-listener; instead just encode the current assigned partitions as part of the Subscription call onPartitionsLost on those owned-but-no-longer-exist partitions; and if the consumer is the leader, send join-group request.
    3. If received REBALANCE_IN_PROGRESS from heartbeat response or commit response: same as a) above.
    4. If received MEMBER_ID_REQUIRED from join-group request: same as a) above.
    5. If received UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION from join-group / sync-group / commit / heartbeat response: reset generation / clear member-id correspondingly, call rebalance listener's onPartitionsLost for all the partition and then re-join group with empty assigned partition.
    6. If received assignment from previous rebalance's sync-group response contains error code NEEDS_REJOIN, revoke partitions as required before sending the join-group request with newly formed assigned partitions.
  2. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Collect the partitions that are claimed as currently owned from the subscriptions; let's call it owned-partitions.
    2. Call the registered assignor of the selected protocol, passing in the cluster metadata and get the returned assignment; let's call the returned assignment assigned-partitions. Note the set of it could be this set otherwise the we would fall into the case 3.b) forevercould be different from owned-partitions.
    3. Compare the owned-partitions with assigned-partitions and generate three exclusive sub-sets:
      1. Intersection(owned-partitions, assigned-partitions). These are partitions that are still owned by some members, and some of them may be now allocated for new members. Let's call it maybe-revoking-partitions.
      2. Minus(assigned-partitions, owned-partitions). These are partitions that are not previously owned by any one. This set is non-empty when its previous owner is on older version and hence revoked them already before joining, or a partition is revoked in previous rebalance by the new versioned member and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call it ready-to-migrate-partitions.
      3. Minus(owned-partitions, assigned-partitions). These are partitions that does not exist in assigned partitions, but are claimed to be owned by the members. It is non-empty if some topics are deleted, or if the leader's metadata is stale (and hence the generated assignment does not have those topics), or if the previous leader has created some topics in its assignor that are not in the cluster yet (consider the Streams case). Let's call it unknown-but-owned-partitions.

    4. For ready-to-migrate-partitions, it is safe to move them to the new member immediately since we know no one owns it before, and hence we can encode the owner from the newly-assigned-partitions directly.
    5. For unknown-but-owned-partitions, it is also safe to just give them back to whoever claimed to be their owners by encoding them directly as well. If this is due to topic metadata update, then a later rebalance will be triggered anyways.
    6. For maybe-revoking-partitions, check if the owner has changed. If yes, exclude them from the assigned-partitions list to the new owner, instead set the error-code to NEED_REJOIN for all the members (this is for fast re-join and rebalance). The old owner will realize it does not own it any more, revoke it and then rejoin.
  3. For every consumer: after received the sync-group request, do the following:
    1. Calculate the newly-added-partitions as Minus(assigned-partitions, owned-partitions) and the revoked-partitions as Minus(owned-partitions, assigned-partitions).
    2. Update the assigned-partitions list.
    3. For those newly-added-partitions, call the rebalance listener's onPartitionsAssigned.
    4. For those revoked-partitions, call the rebalance listener's onPartitionsRevoked.
    5. Check the error code:
      1. If it is NONE, complete.
      2. If it is NEEDS_REJOIN, immediately send another join group request with the updated assigned partitions following step 1.e) above.

...

Note that this proposal depends on user's correct behavior that everyone should be on the same "rebalance protocol" eventually, otherwise the we would fall into the case 3.b) forever. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: when upgrading to the new version everyone is still using "eager" protocol, otherwise it is possible that the leader is only V0-aware and hence did not know some other newer-versioned member did not revoke all partitions and hence re-assign them to other members. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause undefined behavior since the old versioned leader may just proceed with the V0 "eager" assignment while some of the members are actually on V1.

...

Code Block
JoinGroupRequest (v5) => groupId SessionTimeout RebalanceTimeout memberId ProtocolType Protocols ProtocolVersion
   GroupId                 => String
   SessionTimeout          => Int32
   RebalanceTimeout        => Int32
   MemberId                => String
   ProtocolType            => String
   Protocols               => List<Protocol>
   ProtocolVersion         => Int32                   // new field

Protocol (v0) => ProtocolName ProtocolMetadata
   ProtocolName            => String
   ProtocolMetadata        => Bytes (consumer protocol encoded bytes)

...

As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is like:

  • Set Do not override the `stream.rebalancing.mode` to `upgrading`, which will force the stream application to stay with protocol type "consumer".
  • Rolling restart the stream application and the change is automatically applied. This is safe because we are not changing protocol type.

...

  • `rebalance.protocol` config when doing the first rolling bounce to upgrade the version, which will then be "eager" and hence consumers still revoke all partitions upon prepare-rebalance.
  • Doing another rolling bounce with `rebalance.protocol` config set to `cooperative`.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the doc in implementation plan.