Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public enum RebalanceProtocol {
    EAGER((byte) 0), COOPERATIVE((byte) 1);

    private final byte id;

    RebalanceStrategyRebalanceProtocol(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RebalanceProtocol forId(byte id) {
        switch (id) {
            case 0:
                return EAGER;
            case 1:
                return COOPERATIVE;
            default:
                throw new IllegalArgumentException("Unknown rebalance strategyprotocol id: " + id);
        }
    }
}

interface PartitionAssignor {

    // existing interfaces

    short version();                                         // new API, the version of the assignor which indicate the user metadata / algorithmic difference.

    String name();

    List<RebalanceProtocol> supportedProtocols();            // new API, indicate which rebalance strategy it would work with;
                                                             // and associate the protocol with a unique name of the assignor.

    class Subscription {
        public List<String> topics();

        public List<TopicPartition> ownedPartitions();       // new API, on older version 1 should always be empty

        public ByteBuffer userData();
    }

    class Assignment {
        public List<TopicPartition> partitions();

        public ConsumerProtocol.Errors error();             // new API, on older version 1 should always be NONE

        public ByteBuffer userData();
    }
}

...

With the existing built-in Assignor implementations, they will be updated accordingly:


Highest VersionSupported StrategyNotes
RangeAssignor0EagerCurrent default value.
RoundRobinAssignor0Eager
StickyAssignor0Eager, CooperativeTo be default value in 3.0
StreamsAssignor4Eager, Cooperative

The reason we make "range" and "round-robin" to not support cooperative rebalance is that, this protocol implicitly relies on the assignor to be somewhat sticky to make benefits by trading an extra rebalance. However, for these two assignors, they would not be sticky (although sometimes range may luckily reassign partitions back to old owners, it is not best-effort) and hence we've decided to not make them be selected for cooperative protocol.

...


From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

...

  • Having a first rolling bounce to replace the byte code (i.e. swap the jars); set the assignors to "range, sticky". At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  • Having a second rolling bounce to remove the "range" assignor, i.e. only leave the "sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "sticky" assignor works even those there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.
  • The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

    ...

    The existing built-in Assignor implementations will then be updated to:


    Highest VersionSupported StrategyNotes
    RangeAssignor0EagerCurrent default.
    RoundRobinAssignor0Eager
    StickAssignor (old)0Eager
    StickAssignor (new)0CooperativeWill be new default in 3.0
    StreamsAssignor (old)4Eager
    StreamsAssignor (new)4Cooperative


    Although it makes the upgrade path simpler since we would no longer need the "rebalance.protocol" config on consumer anymore, while just encoding multiple assignors during the first rolling bounce of the upgrade path, it requires duplicated assignor class (of course, the new class could just be extending from the old one and there's not much LOC duplicated) which is a bit cumbersome.

    ...