Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
KafkaConsumer:

Subscription => TopicList UserData AssignedPartitions
   TopicList               => List<String>
   UserData                => Bytes   
   AssignedPartitions      => List<String, List<Int32>>   // new field


Assignment = AssignedPartitions UserData RevokedPartitions ErrorCode
   AssignedPartitions      => List<String, List<Int32>>
   UserData                => Bytes
   RevokedPartitions       => List<String, List<Int32>>   // new field
   ErrorCode               => Int16                       // new field

...

  1. For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
    1. If subscription has changed, or topic metadata has changed: do NOT revoke any partitions or call rebalance-listener; instead just encode the current assigned partitions as part of the Subscription.
    2. If received REBALANCE_IN_PROGRESS from heartbeat response or commit response: same as a) above.
    3. If received MEMBER_ID_REQUIRED from join-group request: same as a) above.
    4. If received UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION from join-group / sync-group / commit / heartbeat response: reset generation / clear member-id correspondingly, emigrate the partitions with call rebalance listener's onPartitionsEmigrated for all the partition and then re-join group with empty assigned partition.
    5. If received assignment from previous rebalance's sync-group response contains error code NEEDS_REJOIN, revoke partitions as required before sending the join-group request with newly formed assigned partitions.
  2. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Collect the partitions that are claimed as currently owned from the subscriptions; let's call it owned-partitions.
    2. Call the registered assignor of the selected protocol, passing in the cluster metadata and get the returned assignment; let's call the returned assignment assigned-partitions. Note the set of it could be different from owned-partitions.
    3. Compare the owned-partitions with assigned-partitions and generate three exclusive sub-sets:
      1. Intersection(owned-partitions, assigned-partitions). These are partitions that are still owned by some members, and some of them may be now allocated for new members. Let's call it maybe-revoking-partitions.
      2. Minus(assigned-partitions, owned-partitions). These are partitions that are not previously owned by any one. This set is non-empty when its previous owner is on older version and hence revoked them already before joining, or a partition is revoked in previous rebalance by the new versioned member and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call it ready-to-migrate-partitions.
      3. Minus(owned-partitions, assigned-partitions). These are partitions that does not exist in assigned partitions, but are claimed to be owned by the members. It is non-empty if some topics are deleted, or if the leader's metadata is stale (and hence the generated assignment does not have those topics), or if the previous leader has created some topics in its assignor that are not in the cluster yet (consider the Streams case). Let's call it unknown-but-owned-partitions.

    4. For ready-to-migrate-partitions, it is safe to move them to the new member immediately since we know no one owns it before, and hence we can encode the owner from the newly-assigned-partitions directly.
    5. For unknown-but-owned-partitions, it is also safe to just give them back to whoever claimed to be their owners by encoding them directly as well. If this is due to topic metadata update, then a later rebalance will be triggered anyways.
    6. For maybymaybe-revoking-partitions, check if the owner has changed. If yes, encode it to the old owner in revoked-partitions but NOT encode to the exclude them from the assigned-partitions list to the new owner, instead set the error-code to NEED_REJOIN for all the members (this is for fast re-join and rebalance). The old owner will realize it does not own it any more, revoke it and then rejoin.
  3. For every consumer: For every consumer: after received the sync-group request, do the following:
    1. Check that Calculate the newly assigned-added-partitions is a superset of  as Minus(assigned-partitions, revokedowned-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
      If this is not true, it means either bugs or incompatible members are forming the group (for incompatible members, see below), and hence the consumer should log a fatal error and shutdown gracefully and the revoked-partitions as Minus(owned-partitions, assigned-partitions).
    2. Update the newly assigned-partitions , and for list.
    3. For those newly added  -added-partitions, call the rebalance -listener — this is the same as the current logic.listener's onPartitionsAssigned.
    4. For those revoked-partitions, call the rebalance listener's onPartitionsRevokedIf revoked partitions is not empty, remove those partitions by calling the rebalance-listener.
    5. Check the error code:
      1. If it is NONE, complete.
      2. If it is NEEDS_REJOIN, immediately send another join group request with the updated assigned partitions following step 1.e) above. 


Below is a more illustrative of the different set of partitions and their assignment logic:

...