Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
KafkaConsumer:

Subscription => TopicList UserData AssignedPartitions
   TopicList               => List<String>
   UserData                => Bytes   
   AssignedPartitions      => List<String, List<Int32>>   // new field


Assignment = AssignedPartitions UserData ErrorCode
   AssignedPartitions      => List<String, List<Int32>>
   UserData                => Bytes
   ErrorCode               => Int16                       // new field

Note that it is compatible to inject additional fields after the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some fields before, and some fields (a.k.a. those new fields) after the assignor-specific info bytes, and vice-versa on deserialization. So adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.

As for the newly added error code, here are its possible values (we will talk about their usage later in this document):

Code Block
NONE:        normal
NEED_REJOIN: indicate the consumer to complete the revocation (if any) and re-join group immediately


In addition, we want to resolve a long-lasting issue that when consumer's being kicked out of the group, In addition, we want to resolve a long-lasting issue that when consumer's being kicked out of the group, since it no longer owns the partitions the `commit` call would doom to fail. To distinguish this case with the normal case that consumers are likely still within the group but just try to re-join, we introduce a new API into the consumer rebalance listener:

...


For users implementing this rebalancerrebalance listener, they would not need to make code changes necessarily if they do not need to instantiate different logic; but they'd still need to recompile their implementation class.

...

  1. For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
    1. If subscription has changed: revoke all partitions who are not of subscription interest by calling onPartitionsRevoked, send join-group request with whatever left in the owned partitions in Subscription.
    2. If topic metadata has changed: call onPartitionsLost on those owned-but-no-longer-exist partitions; and if the consumer is the leader, send join-group request.
    3. If received REBALANCE_IN_PROGRESS from heartbeat response / commit response: re-join group with all the currently owned partitions as assigned partitions.
    4. If received UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION from join-group / sync-group / commit / heartbeat response: reset generation / clear member-id correspondingly, call rebalance listener's onPartitionsLost for all the partition and then re-join group with empty assigned partition.
    5. If received MEMBER_ID_REQUIRED from join-group request: set the member id, and then re-send join-group (at this moment the owned partitions should be empty).
    6. If received assignment from previous rebalance's sync-group response contains error code NEEDS_REJOIN, call onPartitionsRevoked as required before sending the join-group request with newly formed assigned partitions (see 3.d below).
  2. For the leader: after getting the received subscription topics, as well as the assigned-For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Collect the partitions that are claimed as currently owned from the subscriptions; let's call it owned-partitions.
    2. Call the registered assignor of the selected protocol, passing in the cluster metadata and get the returned assignment; let's call the returned assignment assigned-partitions. Note the this set could be different from owned-partitions.
    3. Compare the owned-partitions with assigned-partitions and generate three exclusive sub-sets:
      1. Intersection(owned-partitions, assigned-partitions). These are partitions that are still owned by some members, and some of them may be now allocated for new members. Let's call it maybe-revoking-partitions.
      2. Minus(assigned-partitions, owned-partitions). These are partitions that are not previously owned by any one. This set is non-empty when its previous owner is on older version and hence revoked them already before joining, or a partition is revoked in previous rebalance by the new versioned member and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call it ready-to-migrate-partitions.
      3. Minus(owned-partitions, assigned-partitions). These are partitions that does not exist in assigned partitions, but are claimed to be owned by the members. It is non-empty if some topics are deleted, or if the leader's metadata is stale (and hence the generated assignment does not have those topics), or if the previous leader has created some topics in its assignor that are not in the cluster yet (consider the Streams case). Let's call it unknown-but-owned-partitions.
    4. For maybe-revoking-partitions, check if the owner has changed. If yes, exclude them from the assigned-partitions list to the new owner, instead set the error-code to NEED_REJOIN for all the members (this is for fast re-join and rebalance). The . The old owner will realize it does not own it any more, revoke it and then rejoin.trigger another rebalance for these partitions to finally be reassigned
    5. For ready-to-migrate-partitions, it is safe to move them to the new member immediately since we know no one owns it before, and hence we can encode the owner from the newly-assigned-partitions directly.
    6. For unknown-but-owned-partitions, it is also safe to just give them back to whoever claimed to be their owners by encoding them directly as well. If this is due to topic metadata update, then a later rebalance will be triggered anyways.
  3. For every consumer: after received the sync-group response, do the following:
    1. Calculate the newly-added-partitions as Minus(assigned-partitions, owned-partitions) and the revoked-partitions as Minus(owned-partitions, assigned-partitions).
    2. Update the assigned-partitions list.
    3. For those newly-added-partitionsIf the set of revoked-partitions is non-empty, call the rebalance listener's onPartitionsAssignedonPartitionsRevoked and rejoin to trigger another rebalance.
    4. For those revokednewly-added-partitions, call the rebalance listener's onPartitionsRevoked.Check the error code:
    5. If it is NONE, complete.
    6. If it is NEEDS_REJOIN, immediately send another join group request with the updated assigned partitions following step 1.e) aboveonPartitionsAssigned (even if empty).


Below is a more illustrative of the different set of partitions and their assignment logic:

...

NOTE that for this new algorithm to be effective in reducing rebalance costs, it is really expecting the plug-in assignor to be "sticky" in some way, such that the diff of the newly-assigned-partitions and the existing-assigned-partitions can be small, and hence only a few subset of the total number of partitions need to be revoked / migrated at each rebalance in practice – otherwise, we are just paying more rebalance for little benefits. We will talk about how sticky StreamsAssignor StreamsPartitionAssignor would be updated accordingly in Part II.

...

To give a concrete example, suppose your current assigned partitions are {1,2}, and the newly assigned partitions are {2,3}, the . The consumer will call onPartitionsAssignedonPartitionsRevoked(31) and then onPartitionsRevokedonPartitionsAssigned(13). Suppose the former failed with an exception, ConsumerCoordinator would still proceed to complete the latter callback (and assume the latter callback succeeds), and if . If users decide to retry, it is still considered as having successfully changed to {2, 3} – i.e. we consider all of the effects user indicated in the callback have taken place.

...

Since we've already encoded the assigned partitions at the consumer protocol layer, for consumer's sticky partitioner we are effectively duplicating this data at both consumer protocol and assignor's user data. Similarly we have a StreamsPartitionAssignor which is sticky as well but relying on its own user data to do it. We can bump up their versions while simplifying the user-data and leverage on the Subscription#ownedPartitions instead (details about the upgrade compatibility below).  We have added a new out-of-the-box assignor for users that leverages the Subscription's built-in ownedPartitions. Consumer groups plugging in the new "cooperative-sticky" assignor will follow the incremental cooperative rebalancing protocol. A specific upgrade path is required for users wishing to do a rolling upgrade to the new cooperative assignor, as described in the compatibility section below. 

Note that the CooperativeStickyAssignor is for use by plain consumer clients – the existing StreamsPartitionAssignor has simply been modified to support cooperative so users should not try to plug in the CooperativeStickyAssignor (or any other). The upgrade path for Streams differs slightly from that of the clients CooperativeStickyAssignor as well.

Compatibility and Upgrade Path

...


From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

...

  • Having a first rolling bounce to replace the byte code (i.e. swap the jars); set the assignors to "range, cooperative-sticky". At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  • Having a second rolling bounce to remove the "range" assignor, i.e. only leave the "cooperative-sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "cooperative-sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "cooperative-sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.
  • The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

    ...