Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public enum RebalanceProtocolRebalanceStrategy {
    EAGER((byte) 0), COOPERATIVE((byte) 1);

    private final byte id;

    RebalanceProtocolRebalanceStrategy(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RebalanceProtocolRebalanceStrategy forId(byte id) {
        switch (id) {
            case 0:
                return EAGER;
            case 1:
                return COOPERATIVE;
            default:
                throw new IllegalArgumentException("Unknown isolationstrategy levelid: " + id);
        }
    }
}

class PartitionAssignorProtocol {
    String name;                // same as the deprecated PartitionAssingor#name();

    RebalanceStrategy supportedStrategy;
}

interface PartitionAssignor {

    // existing interfaces

    short version();             // new API, the version of the assignor which indicate the user metadata / algorithmic difference.

    @Deprecated;                 // use supportedProtocol instead 
    String name();

    Map<RebalanceProtocol, String>List<PartitionAssignorProtocol> supportedProtocols();    // new API, indicate the ConsumerCoordinator which protocolrebalance strategy it would work with;
                                                             // and associate the protocol with a unique name of the assignor.

    class Subscription {
        public Short version();  // new API, and current version == 2

        public List<String> topics();

        public List<TopicPartition> ownedPartitions();  // new API, if version < 2 should always be empty

        public ByteBuffer userData();
    }

    class Assignment {
        public Short version();  // new API, same as above

        public List<TopicPartition> partitions();

        public ConsumerProtocol.Errors error();   // new API, if version < 2 should always be NONE

        public ByteBuffer userData();
    }
}

...

With the existing built-in Assignor implementations, they will be updated accordingly:


Highest VersionSupported ProtocolStrategyNotes
RangeAssignor0EagerCurrent default value.
RoundRobinAssignor0Eager
StickyAssignor0Eager, CooperativeTo be default value in 3.0
StreamsAssignor4Eager, Cooperative

...

Code Block
languagejava
"rebalance.protocol":


type: Enum 
values: {eager, compatible, cooperative}
default: eager


When the config value is "eager", the consumer would still execute the old rebalance behavior (i.e. revoke everything before joining the group); if the config value is "cooperative", the consumer will then use the new protocol. However, it needs to make sure that the instantiated assignor can work with whatever the protocol it chose by calling PartitionAssignor#supportedProtocols and try to use the corresponding name (if it is supported) of the selected protocol. When the config value is "compatible", the the logic would be a bit complicated: 1) it will still use "eager" consumer coordinator behavior, but 2) it will try to encode two assignor protocols in the subscription if the assignor supports it (i.e. its supportedProtocols have both).

The user's upgrade behavior to leverage this cooperative rebalance from 2.2 and older to newer versions would be (assuming it was "range" before the upgrade):

  1. Having a rolling bounce to replace the byte code (i.e. swap the jars); set the rebalance.protocol config to "compatible" and also set the assignors to "sticky".
  2. Having another rolling bounce to replace the rebalance.protocol to "cooperative".

The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).


In the first rolling bounce we still need the new consumer byte-code to follow the "eager" protocol since some not-yet-bounced consumer will follow "eager" (e.g. still revoking before send JoinGroup). During this process, the old assignor which supports "eager" will always be chosen. After the last member is bounce, the new assignor that supports "cooperative" may be chosen but everyone will still follow the "eager" behavior.

...

Code Block
JoinGroupRequest (v5) => groupId SessionTimeout RebalanceTimeout memberId ProtocolType Protocols ProtocolVersion
   GroupId                 => String
   SessionTimeout          => Int32
   RebalanceTimeout        => Int32
   MemberId                => String
   ProtocolType            => String
   Protocols               => List<Protocol>

Protocol (v0) => ProtocolVersionProtocolName ProtocolMetadata
   ProtocolName     => Int32      => String
   ProtocolMetadata         // new field

Protocol (v0) => ProtocolName ProtocolMetadata
   ProtocolName=> Bytes
   ProtocolVersion         => Int32    => String
   ProtocolMetadata        => Bytes (consumer protocol// encodednew bytes)field


And then on the broker side, when choosing the leader it will pick the one with the highest protocol version instead of picking it "first come first serve".

...

Code Block
languagejava
interface PartitionAssignor {

    // existing interfaces

    short version();             // new API, the version of the assignor which indicate the user metadata / algorithmic difference.

    String name();

    RebalanceProtocolPartitionAssignorProtocol supportedProtocol();    // new API, indicate the ConsumerCoordinator the protocolrebalance strategy it would work with.
}


The existing built-in Assignor implementations will then be updated to:


Highest VersionPreferred ProtocolSupported StrategyNotes
RangeAssignor0EagerCurrent default.
RoundRobinAssignor0Eager
StickAssignor (old)0Eager
StickAssignor (new)0CooperativeWill be new default in 3.0
StreamsAssignor (old)4Eager
StreamsAssignor (new)4Cooperative

...