Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415.

This KIP is trying to customize the incremental rebalancing approach for Kafka consumer client, which will be beneficial for heavy-stateful consumers such as Kafka Streams applications.

In short, the goals of this KIP are:

...


For users implementing this rebalance listener, they would not need to make code changes necessarily if they do not need to instantiate different logic; but they'd still need to recompile their implementation class. The semantics of these callbacks do differ in the new cooperative protocol however, so you should review your implementation to make sure there are no logical changes needed. For details, see ConsumerRebalanceListener and ConsumerPartitionAssignor Semantics below.

Note that adding new fields would increase the size of the request, especially in cases like Streams where user metadata has been heavily encoded wither assignor-specific metadata. We are working on 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7149
 with compression / reformation to reduce the user metadata footprint, and with that we believe adding this new field would not be pushing the size exceeding the message size limit.

...

  1. For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
    1. If subscription has changed: revoke all partitions who are not of subscription interest by calling onPartitionsRevoked, send join-group request with whatever left in the owned partitions in Subscription.
    2. If topic metadata has changed: call onPartitionsLost on those owned-but-no-longer-exist partitions; and if the consumer is the leader, send join-group request.
    3. If received REBALANCE_IN_PROGRESS from heartbeat response / commit response: re-join group with all the currently owned partitions as assigned partitions.
    4. If received UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION from join-group / sync-group / commit / heartbeat response: reset generation / clear member-id correspondingly, call rebalance listener's onPartitionsLost for all the partition and then re-join group with empty assigned partition.
    5. If received MEMBER_ID_REQUIRED from join-group request: set the member id, and then re-send join-group (at this moment the owned partitions should be empty).
  2. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Collect the partitions that are claimed as currently owned from the subscriptions; let's call it owned-partitions.
    2. Call the registered assignor of the selected protocol, passing in the cluster metadata and get the returned assignment; let's call the returned assignment assigned-partitions. Note the this set could be different from owned-partitions.
    3. Compare the owned-partitions with assigned-partitions and generate three exclusive sub-sets:
      1. Intersection(owned-partitions, assigned-partitions). These are partitions that are still owned by some members, and some of them may be now allocated for new members. Let's call it maybe-revoking-partitions.
      2. Minus(assigned-partitions, owned-partitions). These are partitions that are not previously owned by any one. This set is non-empty when its previous owner is on older version and hence revoked them already before joining, or a partition is revoked in previous rebalance by the new versioned member and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call it ready-to-migrate-partitions.
      3. Minus(owned-partitions, assigned-partitions). These are partitions that does not exist in assigned partitions, but are claimed to be owned by the members. It is non-empty if some topics are deleted, or if the leader's metadata is stale (and hence the generated assignment does not have those topics), or if the previous leader has created some topics in its assignor that are not in the cluster yet (consider the Streams case). Let's call it unknown-but-owned-partitions.
    4. For maybe-revoking-partitions, check if the owner has changed. If yes, exclude them from the assigned-partitions list to the new owner. The old owner will realize it does not own it any more, revoke it and then trigger another rebalance for these partitions to finally be reassigned
    5. For ready-to-migrate-partitions, it is safe to move them to the new member immediately since we know no one owns it before, and hence we can encode the owner from the newly-assigned-partitions directly.
    6. For unknown-but-owned-partitions, it is also safe to just give them back to whoever claimed to be their owners by encoding them directly as well. If this is due to topic metadata update, then a later rebalance will be triggered anyways.
  3. For every consumer: after received the sync-group response, do the following:
    1. Calculate the newly-added-partitions as Minus(assigned-partitions, owned-partitions) and the revoked-partitions as Minus(owned-partitions, assigned-partitions).
    2. Update the assigned-partitions list.
    3. If the set of revoked-partitions is non-empty, call the rebalance listener's onPartitionsRevoked and rejoin to trigger another rebalance.
    4. For those newly-added-partitions, call the rebalance listener's onPartitionsAssigned (even if empty).

...

  1. listener callback latency
    1. partitions-revoked-latency-avg
    2. partitions-revoked-latency-max
    3. partitions-assigned-latency-avg
    4. partitions-assigned-latency-max
    5. partitions-lost-latency-avg
    6. partitions-lost-latency-max
  2. rebalance rate and latency (# rebalances per day, and latency including the callback time as well)
    1. rebalance-rate-per-hour
    2. rebalance-total
    3. rebalance-latency-avg
    4. rebalance-latency-max
    5. rebalance-latency-total
    6. failed-rebalance-rate-per-hour
    7. failed-rebalance-total
  3. last-rebalance-seconds-ago (dynamic gauge)

...


CooperativeStickyAssignor and custom COOPERATIVE Assignors

Since we've already encoded the assigned partitions at the consumer protocol layer, for consumer's sticky partitioner we are effectively duplicating this data at both consumer protocol and assignor's user data. Similarly we have a StreamsPartitionAssignor which is sticky as well but relying on its own user data to do it. We have added a new out-of-the-box assignor for users that leverages the Subscription's built-in ownedPartitions. Consumer groups plugging in the new "cooperative-sticky" assignor will follow the incremental cooperative rebalancing protocol. A specific upgrade path is required for users wishing to do a rolling upgrade to the new cooperative assignor, as described in the compatibility section below. 

Users may also wish to implement their own custom assignor, or are already doing so, and want to use the new cooperative protocol. Any assignor that returns COOPERATIVE among the list in #supportedProtocols indicates to the ConsumerCoordinator that it should use the cooperative protocol, and must follow specific assignment logic. First, the assignor should try and be as "sticky" as possible, meaning it should assign partitions back to their previous owner as much as possible. The assignor can leverage the new ownedPartitions field that the Subscription has been augmented with in order to determine the previous assignment. Note that "stickiness" is important for the cooperative protocol to be effective, as in the limit that the new assignment is totally different than the previous one then the cooperative protocol just reduces to the old eager protocol as each member will have to completely revoke all partitions and get a whole new assignment. In addition, any time a partition has to be revoked it will trigger a follow up rebalance, so the assignor should seek to minimize partition movement. Second, in order to ensure safe resource management and clear ownership, the assignor must make sure a partition is revoked by its previous owner before it can be assigned to a new one. Practically speaking, this means that the assignor should generate its "intended" assignment and then check against the previous assignment to see if any partitions are being revoked (that is, in the ownedPartitions but not in the new assignment for a given consumer). If that is the case, that partition should be removed from the new assignment for that round, and wait until it has been revoked so that it can be assigned to its final owner in the second rebalance. See the CooperativeStickyAssignor implementation for an example.Note that the CooperativeStickyAssignor is for use by plain consumer clients – the existing StreamsPartitionAssignor has simply been modified to support cooperative so users should not try .

Note that the CooperativeStickyAssignor is for use by plain consumer clients – the existing StreamsPartitionAssignor has simply been modified to support cooperative so users should not try to plug in the CooperativeStickyAssignor (or any other). The upgrade path for Streams differs slightly from that of the clients CooperativeStickyAssignor as well. See the section on Streams below for details.

ConsumerRebalanceListener and ConsumerPartitionAssignor Semantics

If you do choose to plug in a cooperative assignor and have also implemented a custom ConsumerRebalanceListener, you should be aware of how the semantics and ordering of these callbacks has changed. In the eager protocol, the timeline of a rebalance is always exactly as follows:

      0. Listener#onPartitionsLost: if the member has missed a rebalance and fallen out of the group, this new callback will be invoked on the set of all owned partitions (unless empty). The member will then rejoin the group.

  1. Listener#onPartitionsRevoked: called on the full set of assigned partitions
  2. Assignor#subscriptionUserdata: called when sending the JoinGroup request
  3. Assignor#assign: called only for group leader
  4. Assignor#onAssignment: invoked after receiving the new assignment
  5. Listener#onPartitionsAssigned: called on the full set of assigned partitions (may have overlap with the partitions passed to #onPartitionsRevoked

In the cooperative protocol, the timeline is less exact as some methods may or may not be called, at different times, and on different sets of partitions. This will instead look something like the following

       0. Listener#onPartitionsLost: if the member has missed a rebalance and fallen out of the group, this new callback will be invoked on the set of all owned partitions (unless empty). The member will then rejoin the group.

  1. Listener#onPartitionsRevoked: if the topic metadata has changed such that some owned partitions are no longer in our subscription or don't exist, this callback will be invoked on that subset. If there are no partitions to revoke for those reasons, this callback will not be invoked at this point (note that this will likely be the case in a typical rebalance due to membership changes, eg scaling in/out, member crashes/restarts, etc)
  2. Assignor#subscriptionUserdata: called when sending the JoinGroup request
  3. Assignor#assign: called only for group leader. Note that the #assign method will now have access to the ownedPartitions for each group member (minus any partitions lost/revoked in step 0. or 1.)
  4. Listener#onPartitionsRevoked: this will be called on the subset of previously owned partitions that are intended to be reassigned to another consumer. If this subset is empty, this will not be invoked at all. If this is invoked, it means that a followup rebalance will be triggered so that the revoked partitions can be given to their final intended owner.
  5. Assignor#onAssignment: invoked after receiving the new assignment (will always be after any #onPartitionsRevoked calls, and before #onPartitionsAssigned).
  6. Listener#onPartitionsAssigned: called on the subset of assigned partitions that were not previously owned before this rebalance. There should be no overlap with the revoked partitions (if any). This will always  be called, even if there are no new partitions being assigned to a given member.

The italics indicate a callback that may not be called at all during a rebalance. Take note in particular that it is possible for #onPartitionsRevoked to never be invoked at all during a rebalance, and should not be relied on to signal that a rebalance has started. The #onPartitionsAssigned callback will however always be called, and can therefore be used to signal to your app that a rebalance has just completed to plug in the CooperativeStickyAssignor (or any other). The upgrade path for Streams differs slightly from that of the clients CooperativeStickyAssignor as well. See the section on Streams below for details.


Compatibility and Upgrade Path

...

With the existing built-in Assignor implementations, they will be updated accordingly:


Highest VersionSupported StrategyNotes
RangeAssignor0EagerCurrent default value.
RoundRobinAssignor0Eager
StickyAssignor0Eager
CooperativeStickyAssignor0Eager, CooperativeTo be default value in 3.0
StreamsAssignor4Eager, Cooperative

The reason we make "range" and "round-robin" to not support cooperative rebalance is that, this protocol implicitly relies on the assignor to be somewhat sticky to make benefits by trading an extra rebalance. However, for these two assignors, they would not be sticky (although sometimes range may luckily reassign partitions back to old owners, it is not best-effort) and hence we've decided to not make them be selected for cooperative protocol. The existing StickyAssignor was not made to support Cooperative to ensure users follow the smooth upgrade path outlined below, and avoid running into trouble if they already use the StickyAssignor and blindly upgrade.

...

The specific upgrade path is described below. Note that this will be different depending on whether you have a plain consumer app or a Streams app, and make sure to follow the appropriate one.

...

From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

...

  • The first rolling bounce is to replace the byte code (i.e. swap the jars): set the assignors to "range, cooperative-sticky" (or round-robin/sticky if you are using a different assignor already). At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  • The second rolling bounce is to remove the "range" (or round-robin/sticky) assignor, i.e. only leave the "cooperative-sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "cooperative-sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "cooperative-sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.
  • The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

    ...

  • The first rolling bounce is to replace the byte code (i.e. swap the jars): set the UPGRADE_FROM config to 2.3 (or whatever version you are upgrading from) and then bounce each instance to upgrade it to 2.4. The UPGRADE_FROM config will turn off cooperative rebalancing in the cluster until everyone is on the new byte code, and we can be sure that the leader will be able to safely complete a rebalance. 
  • The second rolling bounce is to remove the UPGRADE_FROM config: simply remove this and bounce each instance for it to begin using the cooperative protocol. Note that unlike plain consumer apps, this means you will have some members on COOPERATIVE while others may still be on EAGER – as long as everyone is on version 2.4 or later, this is safe as the Streams assignor knows how to handle the assignment with either protocol in use. 
  • ...

    This change requires Kafka broker version >= 0.9, where broker will react will react with a rebalance when a normal consumer rejoin consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

    ...

    The existing built-in Assignor implementations will then be updated to:


    Highest VersionSupported StrategyNotes
    RangeAssignor0EagerCurrent default.
    RoundRobinAssignor0Eager
    StickAssignor (old)0Eager
    StickAssignor (new)0CooperativeWill be new default in 3.0
    StreamsAssignor (old)4Eager
    StreamsAssignor (new)4Cooperative


    Although it makes the upgrade path simpler since we would no longer need the "rebalance.protocol" config on consumer anymore, while just encoding multiple assignors during the first rolling bounce of the upgrade path, it requires duplicated assignor class (of course, the new class could just be extending from the old one and there's not much LOC duplicated) which is a bit cumbersome.

    ...