...
Code Block | ||
---|---|---|
| ||
public enum RebalanceProtocolRebalanceStrategy { EAGER((byte) 0), COOPERATIVE((byte) 1); private final byte id; RebalanceProtocolRebalanceStrategy(byte id) { this.id = id; } public byte id() { return id; } public static RebalanceProtocolRebalanceStrategy forId(byte id) { switch (id) { case 0: return EAGER; case 1: return COOPERATIVE; default: throw new IllegalArgumentException("Unknown isolationstrategy levelid: " + id); } } } class PartitionAssignorProtocol { String name; // same as the deprecated PartitionAssingor#name(); RebalanceStrategy supportedStrategy; } interface PartitionAssignor { // existing interfaces short version(); // new API, the version of the assignor which indicate the user metadata / algorithmic difference. @Deprecated; // use supportedProtocol instead String name(); Map<RebalanceProtocol, String>List<PartitionAssignorProtocol> supportedProtocols(); // new API, indicate the ConsumerCoordinator which protocolrebalance strategy it would work with; // and associate the protocol with a unique name of the assignor. class Subscription { public Short version(); // new API, and current version == 2 public List<String> topics(); public List<TopicPartition> ownedPartitions(); // new API, if version < 2 should always be empty public ByteBuffer userData(); } class Assignment { public Short version(); // new API, same as above public List<TopicPartition> partitions(); public ConsumerProtocol.Errors error(); // new API, if version < 2 should always be NONE public ByteBuffer userData(); } } |
...
With the existing built-in Assignor implementations, they will be updated accordingly:
Highest Version | Supported ProtocolStrategy | Notes | |
---|---|---|---|
RangeAssignor | 0 | Eager | Current default value. |
RoundRobinAssignor | 0 | Eager | |
StickyAssignor | 0 | Eager, Cooperative | To be default value in 3.0 |
StreamsAssignor | 4 | Eager, Cooperative |
...
Code Block | ||
---|---|---|
| ||
"rebalance.protocol": type: Enum values: {eager, compatible, cooperative} default: eager |
When the config value is "eager", the consumer would still execute the old rebalance behavior (i.e. revoke everything before joining the group); if the config value is "cooperative", the consumer will then use the new protocol. However, it needs to make sure that the instantiated assignor can work with whatever the protocol it chose by calling PartitionAssignor#supportedProtocols and try to use the corresponding name (if it is supported) of the selected protocol. When the config value is "compatible", the the logic would be a bit complicated: 1) it will still use "eager" consumer coordinator behavior, but 2) it will try to encode two assignor protocols in the subscription if the assignor supports it (i.e. its supportedProtocols have both).
The user's upgrade behavior to leverage this cooperative rebalance from 2.2 and older to newer versions would be (assuming it was "range" before the upgrade):
- Having a rolling bounce to replace the byte code (i.e. swap the jars); set the rebalance.protocol config to "compatible" and also set the assignors to "sticky".
- Having another rolling bounce to replace the rebalance.protocol to "cooperative".
The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).
In the first rolling bounce we still need the new consumer byte-code to follow the "eager" protocol since some not-yet-bounced consumer will follow "eager" (e.g. still revoking before send JoinGroup). During this process, the old assignor which supports "eager" will always be chosen. After the last member is bounce, the new assignor that supports "cooperative" may be chosen but everyone will still follow the "eager" behavior.
...
Code Block |
---|
JoinGroupRequest (v5) => groupId SessionTimeout RebalanceTimeout memberId ProtocolType Protocols ProtocolVersion GroupId => String SessionTimeout => Int32 RebalanceTimeout => Int32 MemberId => String ProtocolType => String Protocols => List<Protocol> Protocol (v0) => ProtocolVersionProtocolName ProtocolMetadata ProtocolName => Int32 => String ProtocolMetadata // new field Protocol (v0) => ProtocolName ProtocolMetadata ProtocolName=> Bytes ProtocolVersion => Int32 => String ProtocolMetadata => Bytes (consumer protocol// encodednew bytes)field |
And then on the broker side, when choosing the leader it will pick the one with the highest protocol version instead of picking it "first come first serve".
...
Code Block | ||
---|---|---|
| ||
interface PartitionAssignor { // existing interfaces short version(); // new API, the version of the assignor which indicate the user metadata / algorithmic difference. String name(); RebalanceProtocolPartitionAssignorProtocol supportedProtocol(); // new API, indicate the ConsumerCoordinator the protocolrebalance strategy it would work with. } |
The existing built-in Assignor implementations will then be updated to:
Highest Version | Preferred ProtocolSupported Strategy | Notes | |
---|---|---|---|
RangeAssignor | 0 | Eager | Current default. |
RoundRobinAssignor | 0 | Eager | |
StickAssignor (old) | 0 | Eager | |
StickAssignor (new) | 0 | Cooperative | Will be new default in 3.0 |
StreamsAssignor (old) | 4 | Eager | |
StreamsAssignor (new) | 4 | Cooperative |
...