Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If you do choose to plug in a cooperative assignor and have also implemented a custom ConsumerRebalanceListener, you should be aware of how the semantics and ordering of these callbacks has changed. In the eager protocol, the timeline of a rebalance is always exactly as follows:      0. 

  1. Listener#onPartitionsLost: if the member has missed a rebalance and fallen out of the group, this new callback will be invoked on the set of all owned partitions (unless empty). The member will then rejoin the group.
  2. Listener#onPartitionsRevoked: called on the full set of assigned partitions
  3. Assignor#subscriptionUserdata: called when sending the JoinGroup request
  4. Assignor#assign: called only for group leader
  5. Assignor#onAssignment: invoked after receiving the new assignment
  6. Listener#onPartitionsAssignedListener#onPartitionsAssigned: called on the full set of assigned partitions (may have overlap with the partitions passed to #onPartitionsRevoked

In the cooperative protocol, the timeline is less exact as some methods may or may not be called, at different times, and on different sets of partitions. This will instead look something like the following       0. Listener#onPartitionsLost:

  1. Listener#onPartitionsLost: if the member has missed a rebalance and fallen out of the group, this new callback will be invoked on the set of all owned partitions (unless empty). The member will then rejoin the group.
  2. Listener#onPartitionsRevoked: if the topic metadata has changed such that some owned partitions are no longer in our subscription or don't exist, this callback will be invoked on that subset. If there are no partitions to revoke for those reasons, this callback will not be invoked at this point (note that this will likely be the case in a typical rebalance due to membership changes, eg scaling in/out, member crashes/restarts, etc)
  3. Assignor#subscriptionUserdata: called when sending the JoinGroup request
  4. Assignor#assignAssignor#assign: called only for group leader. Note that the #assign method will now have access to the ownedPartitions for each group member (minus any partitions lost/revoked in step 0. or 1.)
  5. Listener#onPartitionsRevoked: this will be called on the subset of previously owned partitions that are intended to be reassigned to another consumer. If this subset is empty, this will not be invoked at all. If this is invoked, it means that a followup rebalance will be triggered so that the revoked partitions can be given to their final intended owner.
  6. Assignor#onAssignment: invoked after receiving the new assignment (will always be after any #onPartitionsRevoked calls, and before #onPartitionsAssigned).
  7. Listener#onPartitionsAssignedListener#onPartitionsAssigned: called on the subset of assigned partitions that were not previously owned before this rebalance. There should be no overlap with the revoked partitions (if any). This will always  be called, even if there are no new partitions being assigned to a given member.

...

The ConsumerCoordinator layer, on the other hand, will select which protocol to use based on the assignors first assignor specified in its configs, as the following:

...

The specific upgrade path is described below. Note that this will be different depending on whether you have a plain consumer app or a Streams app, and so make sure to follow the appropriate one.

...

From the user's perspective, the upgrade path of leveraging new protocols is just the same as similar to switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config (or no assignor is configured, which is identical as the RangeAssignor is the default below 3.0). The upgrade path would be:

  1. The first rolling bounce is to replace the byte code (i.e. swap the jars) and introduce the cooperative assignor: set the assignors to "range, cooperative-sticky, range" (or round-robin/sticky/etc if you are using a different assignor already). At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with but will still choose EAGER as the protocol since it's still configured with the "range" , "range" assignor, and the selected rebalancing protocol must be supported by all assignors. in the list. The "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  2. The second rolling bounce is to remove the "range" (or round-robin/sticky/etc) assignor, i.e. only leave the "cooperative-sticky" assignor in the config. At this stage, whoever have has been bounced will then choose the COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "cooperative-sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "cooperative-sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.

The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

...

  • The first rolling bounce is to replace the byte code (i.e. swap the jars): set the UPGRADE_FROM config to 2.3 (or whatever version you are upgrading from) and then bounce each instance to upgrade it to 2.4. The UPGRADE_FROM config will turn off cooperative rebalancing in the cluster until everyone is on the new byte code, and we can be sure that the leader will be able to safely complete a rebalance. 
  • The second rolling bounce is to remove the UPGRADE_FROM config: simply remove this and bounce each instance for it to begin using the cooperative protocol. Note that unlike plain consumer apps, this means you will have some members on COOPERATIVE while others may still be on EAGER – as long as everyone is on version 2.4 or later, this is safe as the Streams assignor knows how to handle the assignment with either protocol in use. 
  • ...

    These changes should all be fairly transparent to Streams apps, as there are no semantics only improved rebalancing performance. However, users using Interactive Queries (IQ) or implementing a StateListener will notice that Streams spends less time in the REBALANCING state, as we will not transition to that until the end of the rebalance. This means all owned stores will remain open for IQ while the rebalance is in progress, and Streams will continue to restore active tasks if there are any that are not yet running, and will process standbys if there aren't.

    ...

    Allow Consumer to Return Records in Rebalance

    As summarized in

    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-8421
    ,

    ...

    a further optimization would be to allow consumers to still return messages

    ...

    belong to its owned partitions even when it is within a rebalance.

    In order to do this, we'd need to allow the consumer#commit API to throw RebalanceInProgressException if it is committing offset while a rebalance is undergoing.

    Code Block
    languagejava
        /**
         * ...
         *
         * @throws org.apache.kafka.common.errors.RebalanceInProgressException if the consumer instance is in the middle of a rebalance
         *            so it is not yet determined which partitions would be assigned to the consumer. In such cases you can first
         *            complete the rebalance by calling {@link #poll(Duration)} and commit can be reconsidered afterwards.
         *            NOTE when you reconsider committing after the rebalance, the assigned partitions may have changed,
         *            and also for those partitions that are still assigned their fetch positions may have changed too
         *            if more records are returned from the {@link #poll(Duration)} call.
         * ...
         */
        @Override
        public void commitSync() {
            commitSync(Duration.ofMillis(defaultApiTimeoutMs));
        }

    With this optimization (implemented in 2.5.0) consumer groups can continue to process some records even while a rebalance is in progress. This means that in addition to processing standby and restoring tasks during a rebalance, Streams apps will be able to make progress on running active tasks.

    Looking into the Future: Heartbeat Communicated Protocol

    ...

    The right way to downgrade is first to perform a rolling bounce the instances while setting "rebalance.protocol" to "compatible"to first add back the RangeAssignor (or whichever assignor you wish to use), and then perform a second rolling bounce to set the rebalance protocol to "eager" if necessaryin which you remove the CooperativeStickyAssignor and also downgrade the consumers to the old byte code. It's essentially the same as the upgrade path, but in reverse.


    Public Interface

    This is to quickly summarize what we would change on the public interface exposed to the user. 

    ...