Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The key idea to resolve this, is to let Assignor instantiations implementors themselves to indicate the ConsumerCoordinator whether they are compatible with the protocol, and then relying ton multi-assignor protocols for a safe upgrade path: users need to keep two assignors when switching the rebalance protocol, and after that they can use another rolling bounce to remove the old versioned protocol.

...

Code Block
languagejava
public enum RebalanceStrategyRebalanceProtocol {
    EAGER((byte) 0), COOPERATIVE((byte) 1);

    private final byte id;

    RebalanceStrategy(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RebalanceStrategyRebalanceProtocol forId(byte id) {
        switch (id) {
            case 0:
                return EAGER;
            case 1:
                return COOPERATIVE;
            default:
                throw new IllegalArgumentException("Unknown strategy id: " + id);
        }
    }
}

classinterface PartitionAssignorProtocolPartitionAssignor {

    String// name;existing interfaces

    short version();          // same semantics as the deprecated PartitionAssingor#name();

    RebalanceStrategy supportedStrategy;
}

interface PartitionAssignor {

    // existing interfaces

    short version();             // new API, the version of the assignor which indicate the user metadata / algorithmic difference.

    String @Deprecatedname();

    List<RebalanceProtocol> supportedProtocols();            // usenew PartitionAssignorProtocol#nameAPI, instead 
    String name();

    List<PartitionAssignorProtocol> supportedProtocols();    // new API, indicate indicate which rebalance strategy it would work with;
                                                             // and associate the protocol with a unique name of the assignor.

    class Subscription {
        public ShortList<String> versiontopics();  // new API, and current version == 2

        public List<String>List<TopicPartition> topicsownedPartitions();

        public List<TopicPartition> ownedPartitions();  // new API, ifon older version <1 2 should always be empty

        public ByteBuffer userData();
    }

    class Assignment {
        public ShortList<TopicPartition> versionpartitions();  // new API, same as above

        public List<TopicPartition>ConsumerProtocol.Errors partitionserror();

        public ConsumerProtocol.Errors error();   // new API, on ifolder version <1 2 should always be NONE

        public ByteBuffer userData();
    }
}

...

  1. The assignor version indicate that for the same assignor series, when its encoded metadata and algorithm changed. It is assumed the newer versioned assignor is compatible with older versions, i.e. it is able to deserialize the metadata and adjust its assignment logic to cope with other older versioned members. It will be used in the JoinGroup request so that broker-side coordinator can select the one with highest version to be the leader (details see below). As for the upcoming release, it is not necessary to be used but in the future it can be useful if brokers have also been upgraded to support the augmented JoinGroup request.
  2. The assignor preferred protocol indicate the rebalance protocol it would work with. Note that the same assignor cannot change this preferred protocol value across in higher versions. ConsumerCoordinate will get this information and with that value it will decide which rebalance logic (e.g. the old one, or the newly proposed process in this KIP) to be used.
  3. The subscription / assignment version will be aligned with the assignor version, it has a helper getter function to help the assignor implementation for backward compatibilityand it will not be exposed via public APIs to users since they are only used for Consumer Coordinator internally. Upon deserialization / serialization, the version of the subscription / assignment will be de / encoded first and the follow-up serde logic can then be selected correspondingly.

...


Highest VersionSupported StrategyNotes
RangeAssignor0EagerCurrent default value.
RoundRobinAssignor0Eager
StickyAssignor0Eager, CooperativeTo be default value in 3.0
StreamsAssignor4Eager, Cooperative

The reason we make "range" and "round-robin" to not support cooperative rebalance is that, this protocol implicitly relies on the assignor to be somewhat sticky to make benefits by trading an extra rebalance. However, for these two assignors, they would not be sticky in (although sometimes range may luckily reassign partitions back to old owners, it is not best-effort) and hence we've decided to not make them be chosen selected for cooperative protocol.

In addition, we introduce the following new config to Consumer:

Code Block
languagejava
"rebalance.strategy":

type: Enum 
values: {eager, compatible, cooperative}
default: eager

When the config value is "eager", the consumer would still execute the old rebalance behavior (i.e. revoke everything before joining the group); if the config value is "cooperative", the consumer will then use the new protocol. However, it needs to make sure that the instantiated assignor can work with whatever the protocol it chose by calling PartitionAssignor#supportedProtocols and try to use the corresponding name (if it is supported) of the selected protocol. When the config value is "compatible", the the logic would be a bit complicated: 1) it will still use "eager" consumer coordinator behavior, but 2) it will try to encode two assignor protocols in the subscription if the assignor supports it (i.e. its supportedProtocols have both).

The user's upgrade behavior to leverage this cooperative rebalance from 2.2 and older to newer versions would be (assuming it was "range" before the upgrade):


The ConsumerCoordinator layer, on the other hand, will select which protocol to use based on the assignors specified in its configs, as the following:

  • Only consider protocols that are supported by all the assignors. If there is no common protocols supported across all the assignors, throw an exception during starting up time.
  • If there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the id number indicates how advanced is the protocol, and we would always want to select the most advanced one).


From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

  1. Having a first rolling bounce to Having a rolling bounce to replace the byte code (i.e. swap the jars); set the rebalance.protocol config assignors to "compatible" and also set the assignors to "sticky".
  2. Having another rolling bounce to replace the rebalance.protocol to "cooperative".

The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).

In the first rolling bounce we still need the new consumer byte-code to follow the "eager" protocol since some not-yet-bounced consumer will follow "eager" (e.g. still revoking before send JoinGroup). During this process, the old assignor which supports "eager" will always be chosen. After the last member is bounce, the new assignor that supports "cooperative" may be chosen but everyone will still follow the "eager" behavior.

And after everyone have upgraded to the new byte-code, in the second rolling bounce, the newly bounced member will always execute the "cooperative" behavior and encode a single assignor name which which associates with "cooperative". This is because we know that whoever was chosen as the leader would be able to read the subscription information and make the correct assignment. For those who're not yet bounced, they will still follow the "eager" behavior but encodes both assignors, and hence the assignor with "cooperative" would be selected. it is fine to have some members following eager while others following cooperative behavior, as long as the leader can recognize them and make assignment choice accordingly: for "eager" members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.

Updated Assignor Logic

Although now Subscription has ownedPartitions field, the PartitionAssignor#subscription() would not require them to be filled since these fields will be always filled at the ConsumerCoordinator. We can, in fact, just let this function to only return the serialized user data bytes, and symmetrically let #onAssignment to only take the serialized user data bytes as well. But this is a bit out of the scope of this KIP and hence I'd omit it here.

The only logic that can (optionally) be updated, is the #assign() function where user instantiated implementation can leverage the "ownedPartitions" field to be a bit sticky.

Looking into the Future

  1. range, sticky". At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  2. Having a second rolling bounce to remove the "range" assignor, i.e. only leave the "sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "sticky" assignor works even those there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.

The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.


Updated Assignor Logic

Although now Subscription has ownedPartitions field, the PartitionAssignor#subscription() would not require them to be filled since these fields will be always filled at the ConsumerCoordinator. We can, in fact, just let this function to only return the serialized user data bytes, and symmetrically let #onAssignment to only take the serialized user data bytes as well. But this is a bit out of the scope of this KIP and hence I'd omit it here.

The only logic that can (optionally) be updated, is the #assign() function where user instantiated implementation can leverage the "ownedPartitions" field to be a bit sticky.


Looking into the Future: Heartbeat Communicated Protocol

Note that the above upgrade path relies on the fact that COOPERATIVE and EAGER members can work together within the same generation as long as the leader recognize both; this however may not be true moving forward if we add a third rebalance protocol. One idea to resolve this in the future is that, instead of letting the members to decide which protocol to use "locally" before sending the join-group request, we will use Heartbeat request / response to piggy-back the communication of the group's supported protocols and let members to rely on that "global" information to make decisions. More specifically:

  • On Heartbeat Request, we will add additional field as a list of protocols that this member supports.
  • On Heartbeat Response, we will add additional field as a single protocol indicating which to use if the error code suggests re-joining the group.

The broker, upon receiving the heartbeat request, if the indicated supported protocols does not contain the one it has decided to use for the up-coming rebalance, then reply with an fatal error.


Looking into the Future: Assignor Version

One semi-orthogonal issue with the current assignor implementation is that, the same assignor may evolve its encoded user metadata byte format; and when that happens, if the leader does not recognize the new format it will cause it to crash. For example today StreamsAssignor has evolved to version 4 of its user metadata bytes, and to cope with old versioned leader we have introduced the "version probing" feature which will require additional rebalances.

To resolve this issueThe above upgrade path is admittedly not ideal, and to avoid such case to happen again, I'd propose to modify the JoinGroup protocol in this KIP as well to take the read `protocol version` from the PartitionAssignor.

...