Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Since we've already encoded the assigned partitions at the consumer protocol layer, for consumer's sticky partitioner we are effectively duplicating this data at both consumer protocol and assignor's user data. Similarly we have a StreamsPartitionAssignor which is sticky as well but relying on its own user data to do it. We have added a new out-of-the-box assignor for users that leverages the Subscription's built-in ownedPartitions. Consumer groups plugging in the new "cooperative-sticky" assignor will follow the incremental cooperative rebalancing protocol. A specific upgrade path is required for users wishing to do a rolling upgrade to the new cooperative assignor, as described in the compatibility section below. 

Note that the CooperativeStickyAssignor is for use by plain consumer clients – the existing StreamsPartitionAssignor has simply been modified to support cooperative so users should not try to plug in the CooperativeStickyAssignor (or any other). The upgrade path for Streams differs slightly from that of the clients CooperativeStickyAssignor as well.

Compatibility and Upgrade Path

Since we are modifying the consumer protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner. In fact, the most tricky thing for this KIP is actually how to support safe online upgrade path, such that even if users made mistakes and not strictly following the instructions, we can pause the consumer from proceeding and hence eventually users will realized it by seeing e.g. consumer lags and investigating logs, rather than letting them to fall into an undefined behavior or even worse, having some partitions to be owned by more than one member.

Note that since we are injecting additional fields at the end of the consumer protocol, the new protocol would still be compatible with the old version. That means, an old-versioned consumer would still be able to deserialize a newer-versioned protocol data (as long as we only append new fields at the end, this would be the case).

However, when consumers with V1 is joining the group, there's a key behavioral difference that they would NOT revoke their partitions, and hence it is not safe to re-assign any of their partitions as we did in the current (V0) assignment logic. That means, the leader can only proceed the assignment when it knew that all the members are either on V0, or V1 versions

Another thing to keep in mind that, if the leader itself is still on older version, it would still be able to deserialize the V1 subscription protocol as V0, by ignoring the additional fields, and hence it may "think" everyone is still on V0, while some of them may actually be on the newer version.

The key idea to resolve this, is to let Assignor implementors themselves to indicate the ConsumerCoordinator whether they are compatible with the protocol, and then relying ton multi-assignor protocols for a safe upgrade path: users need to keep two assignors when switching the rebalance protocol, and after that they can use another rolling bounce to remove the old versioned protocol.

More specifically, we will introduce the new public API ConsumerPartitionAssignor class and its #Subscription / #Assignment (the old classes are actually in `internal` package mistakenly, so we use this KIP to deprecate that class with this new one, along with augmented methods) as follows:

Users may also wish to implement their own custom assignor, or are already doing so, and want to use the new cooperative protocol. Any assignor that returns COOPERATIVE among the list in #supportedProtocols indicates to the ConsumerCoordinator that it should use the cooperative protocol, and must follow specific assignment logic. First, the assignor should try and be as "sticky" as possible, meaning it should assign partitions back to their previous owner as much as possible. The assignor can leverage the new ownedPartitions field that the Subscription has been augmented with in order to determine the previous assignment. Note that "stickiness" is important for the cooperative protocol to be effective, as in the limit that the new assignment is totally different than the previous one then the cooperative protocol just reduces to the old eager protocol as each member will have to completely revoke all partitions and get a whole new assignment. In addition, any time a partition has to be revoked it will trigger a follow up rebalance, so the assignor should seek to minimize partition movement. Second, in order to ensure safe resource management and clear ownership, the assignor must make sure a partition is revoked by its previous owner before it can be assigned to a new one. Practically speaking, this means that the assignor should generate its "intended" assignment and then check against the previous assignment to see if any partitions are being revoked (that is, in the ownedPartitions but not in the new assignment for a given consumer). If that is the case, that partition should be removed from the new assignment for that round, and wait until it has been revoked so that it can be assigned to its final owner in the second rebalance. See the CooperativeStickyAssignor implementation for an example.

Note that the CooperativeStickyAssignor is for use by plain consumer clients – the existing StreamsPartitionAssignor has simply been modified to support cooperative so users should not try to plug in the CooperativeStickyAssignor (or any other). The upgrade path for Streams differs slightly from that of the clients CooperativeStickyAssignor as well. See the section on Streams below for details.

Compatibility and Upgrade Path

Since we are modifying the consumer protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner. In fact, the most tricky thing for this KIP is actually how to support safe online upgrade path, such that even if users made mistakes and not strictly following the instructions, we can pause the consumer from proceeding and hence eventually users will realized it by seeing e.g. consumer lags and investigating logs, rather than letting them to fall into an undefined behavior or even worse, having some partitions to be owned by more than one member.

Note that since we are injecting additional fields at the end of the consumer protocol, the new protocol would still be compatible with the old version. That means, an old-versioned consumer would still be able to deserialize a newer-versioned protocol data (as long as we only append new fields at the end, this would be the case).

However, when consumers with V1 is joining the group, there's a key behavioral difference that they would NOT revoke their partitions, and hence it is not safe to re-assign any of their partitions as we did in the current (V0) assignment logic. That means, the leader can only proceed the assignment when it knew that all the members are either on V0, or V1 versions

Another thing to keep in mind that, if the leader itself is still on older version, it would still be able to deserialize the V1 subscription protocol as V0, by ignoring the additional fields, and hence it may "think" everyone is still on V0, while some of them may actually be on the newer version.


The key idea to resolve this, is to let Assignor implementors themselves to indicate the ConsumerCoordinator whether they are compatible with the protocol, and then relying ton multi-assignor protocols for a safe upgrade path: users need to keep two assignors when switching the rebalance protocol, and after that they can use another rolling bounce to remove the old versioned protocol.


More specifically, we will introduce the new public API ConsumerPartitionAssignor class and its #Subscription / #Assignment (the old classes are actually in `internal` package mistakenly, so we use this KIP to deprecate that class with this new one, along with augmented methods) as follows:

Code Block
languagejava
ConsumerGroupMetadata {
    public String groupId();

    public int generationId();

    public String memberId();

    public Optional<String> groupInstanceId();
}

public interface ConsumerPartitionAssignor {

    /**
     * Return serialized data that will be included in the {@link Subscription} sent to the leader
     * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
     *
     * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
     *               and variants
     * @return nullable subscription user data
Code Block
languagejava
ConsumerGroupMetadata {
    public String groupId();

    public int generationId();

    public String memberId();

    public Optional<String> groupInstanceId();
}

public interface ConsumerPartitionAssignor {

    /**
     * Return serialized data that will be included in the {@link Subscription} sent to the leader
     * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
     *
     * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
     *               and variants
     * @return nullable subscription user data
     */
    default ByteBuffer subscriptionUserData(Set<String> topics) {
        return null;
    }

    /**
     * Perform the group assignment given the member subscriptions and current cluster metadata.
     * @param metadata Current topic/broker metadata known by consumer
     * @param groupSubscription Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)}
     * @return A map from the members to their respective assignments. This should have one entry
     *         for each member in the input subscription map.
     */
    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);

    /**
     * Callback which is invoked when a group member receives its assignment from the leader.
     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)}
     * @param metadata Additional metadata on the consumer (optional)
     */
    default voidByteBuffer onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {subscriptionUserData(Set<String> topics) {
        return null;
    }

    /**
     * IndicatePerform whichthe rebalancegroup protocolassignment thisgiven assignorthe works with;member subscriptions and current cluster metadata.
     * By@param defaultmetadata itCurrent shouldtopic/broker alwaysmetadata workknown with {@link RebalanceProtocol#EAGER}.by consumer
     */
    default List<RebalanceProtocol> supportedProtocols() {
        return Collections.singletonList(RebalanceProtocol.EAGER);
    }

    /** @param groupSubscription Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)}
     * @return A map from the members to their respective assignments. This should have one entry
     * Return the version    of the assignorfor whicheach indicatesmember howin the userinput metadatasubscription encodingsmap.
     */
  and  theGroupAssignment assignment algorithm gets evolved.assign(Cluster metadata, GroupSubscription groupSubscription);

     /**/
    default short* version() {
        return (short) 0;
    }

    /**
     * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required
     * to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
     * @return non-null unique name
     */
    String name();

    final class Subscription {Callback which is invoked when a group member receives its assignment from the leader.
     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)}
     * @param metadata Additional metadata on the consumer (optional)
     */
    default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
    }

    /**
     * Indicate which rebalance protocol this assignor works with;
     * By  public List<String> topics();

default it should always work with {@link RebalanceProtocol#EAGER}.
     */
    publicdefault ByteBufferList<RebalanceProtocol> userDatasupportedProtocols(); {

        public List<TopicPartition> ownedPartitions(return Collections.singletonList(RebalanceProtocol.EAGER);

    }

    public/**
 void setGroupInstanceId(Optional<String> groupInstanceId);

  * Return the version of the publicassignor Optional<String> groupInstanceId();which indicates how the user metadata encodings
    }

 * and the finalassignment classalgorithm Assignmentgets {evolved.
     */
   public default List<TopicPartition>short partitionsversion();
 {
        public ByteBufferreturn userData(short) 0;
    }

    final/**
 class GroupSubscription {
  * Unique name for this  public GroupSubscription(Map<String, Subscription> subscriptions);

        public Map<String, Subscription> groupSubscription();
    }

    final class GroupAssignment {assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required
     * to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
     * @return non-null public GroupAssignment(Map<String, Assignment> assignments);

unique name
     */
   public Map<String, Assignment> groupAssignmentString name();

    }

final class Subscription  /**{
     * The rebalance protocolpublic defines partition assignment and revocation semantics. The purpose is to establish aList<String> topics();

        public ByteBuffer userData();

     * consistent set ofpublic rules that all consumers in a group follow in order to transfer ownership of a partition.
List<TopicPartition> ownedPartitions();

        public void setGroupInstanceId(Optional<String> groupInstanceId);

        *public {@link ConsumerPartitionAssignor} implementors can claim supporting one or more rebalance protocols via theOptional<String> groupInstanceId();
    }

    final class Assignment {
     * {@link ConsumerPartitionAssignor#supportedProtocols()}, and it is their responsibility to respect the rules   public List<TopicPartition> partitions();

        public ByteBuffer userData();
    }

 * of those protocolsfinal inclass theirGroupSubscription {@link ConsumerPartitionAssignor#assign(Cluster, GroupSubscription)} implementations.
     * Failures to follow the rules of the supported protocols would lead to runtime error or undefined behavior.public GroupSubscription(Map<String, Subscription> subscriptions);

        public Map<String, Subscription> groupSubscription();
     *}

    final *class TheGroupAssignment {@link
 RebalanceProtocol#EAGER} rebalance protocol requires a consumer to always revoke all its ownedpublic GroupAssignment(Map<String, Assignment> assignments);

     * partitions before participatingpublic inMap<String, a rebalance event. It therefore allows a complete reshuffling of the assignment.Assignment> groupAssignment();
    }

    /**
     *
 The rebalance protocol defines *partition {@linkassignment RebalanceProtocol#COOPERATIVE}and rebalancerevocation protocolsemantics. allowsThe apurpose consumeris to establish a
 retain its currently owned
 * consistent set of *rules partitionsthat beforeall participatingconsumers in a rebalancegroup event.follow Thein assignororder shouldto nottransfer reassignownership anyof owneda partitionspartition.
     * immediately, but instead may indicate consumers the need for partition revocation so that the revoked{@link ConsumerPartitionAssignor} implementors can claim supporting one or more rebalance protocols via the
     * partitions can be reassigned to other consumers in the next rebalance event. This is designed for sticky assignment
     * logic which attempts to minimize partition reassignment with cooperative adjustments.
     */
    enum RebalanceProtocol {
        EAGER((byte) 0), COOPERATIVE((byte) 1);

        private final byte id;

        RebalanceProtocol(byte id) {
            this.id = id;
        }

        public byte id() {
            return id;
        }

        public static RebalanceProtocol forId(byte id) {
            switch (id) {
                case 0:
                    return EAGER;
                case 1:
      {@link ConsumerPartitionAssignor#supportedProtocols()}, and it is their responsibility to respect the rules
     * of those protocols in their {@link ConsumerPartitionAssignor#assign(Cluster, GroupSubscription)} implementations.
     * Failures to follow the rules of the supported protocols would lead to runtime error or undefined behavior.
     *
     * The {@link RebalanceProtocol#EAGER} rebalance protocol requires a consumer to always revoke all its owned
     * partitions before participating in a rebalance event. It therefore allows a complete reshuffling of the assignment.
     *
     * {@link RebalanceProtocol#COOPERATIVE} rebalance protocol allows a consumer to retain its currently owned
     * partitions before participating in a rebalance event. The assignor should not reassign any owned partitions
     * immediately, but instead may indicate consumers the need for partition revocation so that the revoked
     * partitions can be reassigned to other consumers in the next rebalance event. This is designed for sticky assignment
     * logic which attempts to minimize partition reassignment with cooperative adjustments.
     */
    enum RebalanceProtocol {
        EAGER((byte) 0), COOPERATIVE((byte) 1);

        private final byte id;

    return COOPERATIVE;
   RebalanceProtocol(byte id) {
           default:
 this.id = id;
        }

        public throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);byte id() {
            return id;
        }

        public static RebalanceProtocol forId(byte id) {
            }
switch (id) {
                case }0:
    }
}

Note the semantical difference between the above added fields:

  1. The assignor version indicate that for the same assignor series, when its encoded metadata and algorithm changed. It is assumed the newer versioned assignor is compatible with older versions, i.e. it is able to deserialize the metadata and adjust its assignment logic to cope with other older versioned members. It will be used in the JoinGroup request so that broker-side coordinator can select the one with highest version to be the leader (details see below). As for the upcoming release, it is not necessary to be used but in the future it can be useful if brokers have also been upgraded to support the augmented JoinGroup request.
  2. The assignor preferred protocol indicate the rebalance protocol it would work with. Note that the same assignor cannot change this preferred protocol value across in higher versions. ConsumerCoordinate will get this information and with that value it will decide which rebalance logic (e.g. the old one, or the newly proposed process in this KIP) to be used.
  3. The subscription / assignment version will be aligned with the assignor version, and it will not be exposed via public APIs to users since they are only used for Consumer Coordinator internally. Upon deserialization / serialization, the version of the subscription / assignment will be de / encoded first and the follow-up serde logic can then be selected correspondingly.

With the existing built-in Assignor implementations, they will be updated accordingly:

...

                    return EAGER;
                case 1:
                    return COOPERATIVE;
                default:
                    throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
            }
        }
    }
}


Note the semantical difference between the above added fields:

  1. The assignor version indicate that for the same assignor series, when its encoded metadata and algorithm changed. It is assumed the newer versioned assignor is compatible with older versions, i.e. it is able to deserialize the metadata and adjust its assignment logic to cope with other older versioned members. It will be used in the JoinGroup request so that broker-side coordinator can select the one with highest version to be the leader (details see below). As for the upcoming release, it is not necessary to be used but in the future it can be useful if brokers have also been upgraded to support the augmented JoinGroup request.
  2. The assignor preferred protocol indicate the rebalance protocol it would work with. Note that the same assignor cannot change this preferred protocol value across in higher versions. ConsumerCoordinate will get this information and with that value it will decide which rebalance logic (e.g. the old one, or the newly proposed process in this KIP) to be used.
  3. The subscription / assignment version will be aligned with the assignor version, and it will not be exposed via public APIs to users since they are only used for Consumer Coordinator internally. Upon deserialization / serialization, the version of the subscription / assignment will be de / encoded first and the follow-up serde logic can then be selected correspondingly.


With the existing built-in Assignor implementations, they will be updated accordingly:


Highest VersionSupported StrategyNotes
RangeAssignor0EagerCurrent default value.
RoundRobinAssignor0Eager
StickyAssignor0Eager
CooperativeStickyAssignor0Eager, CooperativeTo be default value in 3.0
StreamsAssignor4Eager, Cooperative

The reason we make "range" and "round-robin" to not support cooperative rebalance is that, this protocol implicitly relies on the assignor to be somewhat sticky to make benefits by trading an extra rebalance. However, for these two assignors, they would not be sticky (although sometimes range may luckily reassign partitions back to old owners, it is not best-effort) and hence we've decided to not make them be selected for cooperative protocol. The existing StickyAssignor was not made to support Cooperative to ensure users follow the smooth upgrade path outlined below, and avoid running into trouble if they already use the StickyAssignor and blindly upgrade.


The ConsumerCoordinator layer, on the other hand, will select which protocol to use based on the assignors specified in its configs, as the following:

  • Only consider protocols that are supported by all the assignors. If there is no common protocols supported across all the assignors, throw an exception during starting up time.
  • If there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the id number indicates how advanced is the protocol, and we would always want to select the most advanced one).

The specific upgrade path is described below. Note that this will be different depending on whether you have a plain consumer app or a Streams app, and make sure to follow the appropriate one.

Consumer

From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

  1. The first rolling bounce is to replace the byte code (i.e. swap the jars): set the assignors to "range, cooperative-sticky" (or round-robin/sticky if you are using a different assignor already). At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  2. The second rolling bounce is to remove the "range" (or round-robin/sticky) assignor, i.e. only leave the "cooperative-sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "cooperative-sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "cooperative-sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.

The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

Streams

Streams embeds its own assignor which will determine the supported protocol. In 2.4 it will enable cooperative rebalancing by default, so a specific upgrade path must be followed in order to safely upgrade to 2.4+ since assignors on the earlier versions do not know how to handle a cooperative rebalance safely (or even what it is). To do so you must perform two rolling bounces as follows:

  1. The first rolling bounce is to replace the byte code (i.e. swap the jars): set the UPGRADE_FROM config to 2.3 (or whatever version you are upgrading from) and then bounce each instance to upgrade it to 2.4. The UPGRADE_FROM config will turn off cooperative rebalancing in the cluster until everyone is on the new byte code, and we can be sure that the leader will be able to safely complete a rebalance. 
  2. The second rolling bounce is to remove the UPGRADE_FROM config: simply remove this and bounce each instance for it to begin using the cooperative protocol. Note that unlike plain consumer apps, this means you will have some members on COOPERATIVE while others may still be on EAGER – as long as everyone is on version 2.4 or later, this is safe as the Streams assignor knows how to handle the assignment with either protocol in use. 

Note that as long as the UPGRADE_FROM parameter is set to 2.3 or lower, Streams will stay on EAGER. If for some reason you decide you would like to stay on eager, or return to it after switching to cooperative, you can simply set/leave the UPGRADE_FROM parameter in place. If you intend to use this config to stay on eager even after upgrading, it is recommended that you set it to a version in the range 2.0 - 2.3 as setting it to earlier than 2.0 will force Streams to remain stuck on an earlier metadata version

...

The reason we make "range" and "round-robin" to not support cooperative rebalance is that, this protocol implicitly relies on the assignor to be somewhat sticky to make benefits by trading an extra rebalance. However, for these two assignors, they would not be sticky (although sometimes range may luckily reassign partitions back to old owners, it is not best-effort) and hence we've decided to not make them be selected for cooperative protocol. The existing StickyAssignor was not made to support Cooperative to ensure users follow the smooth upgrade path outlined below, and avoid running into trouble if they already use the StickyAssignor and blindly upgrade.

The ConsumerCoordinator layer, on the other hand, will select which protocol to use based on the assignors specified in its configs, as the following:

  • Only consider protocols that are supported by all the assignors. If there is no common protocols supported across all the assignors, throw an exception during starting up time.
  • If there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the id number indicates how advanced is the protocol, and we would always want to select the most advanced one).

From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

  1. Having a first rolling bounce to replace the byte code (i.e. swap the jars); set the assignors to "range, cooperative-sticky". At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  2. Having a second rolling bounce to remove the "range" assignor, i.e. only leave the "cooperative-sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "cooperative-sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "cooperative-sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.

The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

Updated Assignor Logic

Although now Subscription has ownedPartitions field, the PartitionAssignor#subscription() would not require them to be filled since these fields will be always filled at the ConsumerCoordinator. We can, in fact, just let this function to only return the serialized user data bytes, and symmetrically let #onAssignment to only take the serialized user data bytes as well. But this is a bit out of the scope of this KIP and hence I'd omit it here.

The only logic that can (optionally) be updated, is the #assign() function where user instantiated implementation can leverage the "ownedPartitions" field to be a bit sticky.

Looking into the Future: Heartbeat Communicated Protocol

...

  1. Introducing the new {ConsumerPartitionAssignor} interface and deprecate the old {PartitionAssignor} interface (details see ConsumerProtocol).
  2. Augmenting the existing {ConsumerRebalanceListener} interface with the new {onPartitionsLost} function (details see ConsumerProtocol).
  3. Adding a list of new metrics to the consumer instance reflecting rebalance events (details see ConsumerMetrics).
  4. Augmenting the JoinGroupRequest protocol with the protocol version (details see LookingintotheFuture:AssignorVersion).


Compatibility, Deprecation, and Migration Plan

...