Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Since we've already encoded the assigned partitions at the consumer protocol layer, for consumer's sticky partitioner we are effectively duplicating this data at both consumer protocol and assignor's user data.

Along with this proposal, we will add a new StickyAssignor and deprecate this assignor; unfortunately we cannot simply update the existing StickyAssignor due to compatibility issues (as described below). Similarly we need to have a new StreamsPartitionAssignor as well which supports the new rebalance protocol.

Compatibility and Upgrade Path

Similarly we have a StreamsPartitionAssignor which is sticky as well but relying on its own user data to do it. We can bump up their versions while simplifying the user-data and leverage on the Subscription#ownedPartitions instead (details about the upgrade compatibility below).


Compatibility and Upgrade Path

Since we are modifying the Since we are modifying the consumer protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner. In fact, the most tricky thing for this KIP is actually how to support safe online upgrade path, such that even if users made mistakes and not strictly following the instructions, we can pause the consumer from proceeding and hence eventually users will realized it by seeing e.g. consumer lags and investigating logs, rather than letting them to fall into an undefined behavior or even worse, having some partitions to be owned by more than one member.

...

More specifically, the PartitionAssignor class and its #Subscription / #Assignment subclasses will be augmented as follows:

Code Block
languagejava
interfacepublic enum PartitionAssignorRebalanceProtocol {

     // existing interfacesEAGER((byte) 0), COOPERATIVE((byte) 1);

    short version();  private final byte id;

    RebalanceProtocol(byte id) {
     // new API, thethis.id version= ofid;
 the assignor which indicate the user metadata / algorithmic difference. }

    RebalanceProtocolpublic byte preferredProtocolid(); {
   // new API, indicate the ConsumerCoordinatorreturn protocolid;
 it would work with.}

    class Subscriptionpublic static RebalanceProtocol forId(byte id) {
        publicswitch Short version(id); {
  // new API, and current version == 2

   case 0:
    public  List<String> topics();

        public List<TopicPartition>return ownedPartitions()EAGER;
  //  new API, if version < 2 should always becase empty
1:
        public ByteBuffer userData();
    }

  return COOPERATIVE;
 class Assignment {
        public Short version();default:
   // new API, same as above

        publicthrow List<TopicPartition>new partitions(IllegalArgumentException("Unknown isolation level " + id);

         public ConsumerProtocol.Errors error();}
    }
}

interface PartitionAssignor {

    // new API, if version < 2 should always be NONE

 existing interfaces

    short version();             // publicnew ByteBuffer userData();
    }
}

Note the semantical difference between the above added fields and their getters:

  1. The assignor version indicate that for the same assignor series, when its encoded metadata and algorithm changed. It is assumed the newer versioned assignor is compatible with older versions, i.e. it is able to deserialize the metadata and adjust its assignment logic to cope with other older versioned members. It will be used in the JoinGroup request so that broker-side coordinator can select the one with highest version to be the leader (details see below). As for the upcoming release, it is not necessary to be used but in the future it can be useful if brokers have also been upgraded to support the augmented JoinGroup request.
  2. The assignor preferred protocol indicate the rebalance protocol it would work with. Note that the same assignor cannot change this preferred protocol value across in higher versions. ConsumerCoordinate will get this information and with that value it will decide which rebalance logic (e.g. the old one, or the newly proposed process in this KIP) to be used.
  3. The subscription / assignment version will be aligned with the assignor version, it is just a helper function to make the assignor implementation itself to support backward compatibility easily. Upon deserialization / serialization, the version of the subscription / assignment will be de / encoded first and the follow-up serde logic can then be selected correspondingly.

With the existing built-in Assignor implementations, they will be updated accordingly:

...

Therefore

when upgrading we need the new consumer byte-code to first still following the old versioned protocol for both metadata encoding, as well as the behavior (e.g. still revoking before send JoinGroup). And after everyone have upgraded to the new byte-code, we can allow them to start rebalancing with the new versioned protocol. Note that during the later rebalance, it is still possible that consumers will send join-group request with old version (but the key here is that they are all new-version aware), in which case consumer leader can freely adjust its logic based on the aggregated versions. More specifically, we introduce the following new config to Consumer:

When the config value is "eager", the consumer would still use V0 of the consumer protocol as well as the rebalance behavior; if the config value is "cooperative", the consumer will then use the new V1 protocol as well as the new algorithm. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of assignors to register multiple protocols and let leader to auto-switch to new versions, we need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).

The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize V0, but due to compatibility would still be able to deserialize V1 protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group.

As for the upgrade path, we would require users to do two rolling bounces, where:

  1. In the first rolling bounce, keep the rebalance.protocol as "eager" (no need to manually change anything though since it is the default value).
  2. After the first rolling bounce is completely done. Then do a second rolling bounce in which rebalance.protocol is updated to "cooperative". The above logic will make sure that eventually when everyone's sending the join-group request with V1.

There's no changes needed on the V1's new assignment algorithm, since if user's are following the right protocol, then on the first rolling bounce everyone will still follow the rebalance protocol of V0. On the second rolling bounce, the leader should always be on the newer code, so it can recognize both V1 and V0 members (while some are not being bounced with the config change yet). It is safe to just follow the above algorithm since for V0 members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, it is safe to move those partitions to other members immediately based on the assignor's output.

Note that this proposal depends on user's correct behavior that when upgrading to the new version everyone is still using "eager" protocol, otherwise it is possible that the leader is only V0-aware and hence did not know some other newer-versioned member did not revoke all partitions and hence re-assign them to other members. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause undefined behavior since the old versioned leader may just proceed with the V0 "eager" assignment while some of the members are actually on V1.

Looking into the Future

The above upgrade path is admittedly not ideal, and to avoid such case to happen again, I'd propose to modify the JoinGroup protocol in this KIP as well:

Code Block
JoinGroupRequest (v5) => groupId SessionTimeout RebalanceTimeout memberId ProtocolType Protocols ProtocolVersion
   GroupId                 => String
   SessionTimeout          => Int32
   RebalanceTimeout        => Int32
   MemberId                => String
   ProtocolType            => String
   Protocols               => List<Protocol>
   ProtocolVersion         => Int32                   // new field

Protocol (v0) => ProtocolName ProtocolMetadata
   ProtocolName            => String
   ProtocolMetadata        => Bytes (consumer protocol encoded bytes)

...

API, the version of the assignor which indicate the user metadata / algorithmic difference.

    @Deprecated;                 // use supportedProtocol instead 
    String name();

    Map<RebalanceProtocol, String> supportedProtocols();    // new API, indicate the ConsumerCoordinator protocol it would work with;
                                                            // and associate the protocol with a unique name of the assignor.

    class Subscription {
        public Short version();  // new API, and current version == 2

        public List<String> topics();

        public List<TopicPartition> ownedPartitions();  // new API, if version < 2 should always be empty

        public ByteBuffer userData();
    }

    class Assignment {
        public Short version();  // new API, same as above

        public List<TopicPartition> partitions();

        public ConsumerProtocol.Errors error();   // new API, if version < 2 should always be NONE

        public ByteBuffer userData();
    }
}


Note the semantical difference between the above added fields:

  1. The assignor version indicate that for the same assignor series, when its encoded metadata and algorithm changed. It is assumed the newer versioned assignor is compatible with older versions, i.e. it is able to deserialize the metadata and adjust its assignment logic to cope with other older versioned members. It will be used in the JoinGroup request so that broker-side coordinator can select the one with highest version to be the leader (details see below). As for the upcoming release, it is not necessary to be used but in the future it can be useful if brokers have also been upgraded to support the augmented JoinGroup request.
  2. The assignor preferred protocol indicate the rebalance protocol it would work with. Note that the same assignor cannot change this preferred protocol value across in higher versions. ConsumerCoordinate will get this information and with that value it will decide which rebalance logic (e.g. the old one, or the newly proposed process in this KIP) to be used.
  3. The subscription / assignment version will be aligned with the assignor version, it has a helper getter function to help the assignor implementation for backward compatibility. Upon deserialization / serialization, the version of the subscription / assignment will be de / encoded first and the follow-up serde logic can then be selected correspondingly.

With the existing built-in Assignor implementations, they will be updated accordingly:


Highest VersionSupported ProtocolNotes
RangeAssignor0EagerCurrent default value.
RoundRobinAssignor0Eager
StickyAssignor0Eager, CooperativeTo be default value in 3.0
StreamsAssignor4Eager, Cooperative

The reason we make "range" and "round-robin" to not support cooperative rebalance is that, this protocol implicitly relies on the assignor to be somewhat sticky to make benefits by trading an extra rebalance. However, for these two assignors, they would not be sticky in best-effort and hence we've decided to not make them be chosen for cooperative protocol.


In addition, we introduce the following new config to Consumer:

Code Block
languagejava
"rebalance.protocol":


type: Enum 
values: {eager, compatible, cooperative}
default: eager


When the config value is "eager", the consumer would still execute the old rebalance behavior (i.e. revoke everything before joining the group); if the config value is "cooperative", the consumer will then use the new protocol. However, it needs to make sure that the instantiated assignor can work with whatever the protocol it chose by calling PartitionAssignor#supportedProtocols and try to use the corresponding name (if it is supported) of the selected protocol. When the config value is "compatible", the the logic would be a bit complicated: 1) it will still use "eager" consumer coordinator behavior, but 2) it will try to encode two assignor protocols in the subscription if the assignor supports it (i.e. its supportedProtocols have both).

The user's upgrade behavior to leverage this cooperative rebalance from 2.2 and older to newer versions would be (assuming it was "range" before the upgrade):

  1. Having a rolling bounce to replace the byte code (i.e. swap the jars); set the rebalance.protocol config to "compatible" and also set the assignors to "sticky".
  2. Having another rolling bounce to replace the rebalance.protocol to "cooperative".

The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).


In the first rolling bounce we still need the new consumer byte-code to follow the "eager" protocol since some not-yet-bounced consumer will follow "eager" (e.g. still revoking before send JoinGroup). During this process, the old assignor which supports "eager" will always be chosen. After the last member is bounce, the new assignor that supports "cooperative" may be chosen but everyone will still follow the "eager" behavior.

And after everyone have upgraded to the new byte-code, in the second rolling bounce, the newly bounced member will always execute the "cooperative" behavior and encode a single assignor name which which associates with "cooperative". This is because we know that whoever was chosen as the leader would be able to read the subscription information and make the correct assignment. For those who're not yet bounced, they will still follow the "eager" behavior but encodes both assignors, and hence the assignor with "cooperative" would be selected. it is fine to have some members following eager while others following cooperative behavior, as long as the leader can recognize them and make assignment choice accordingly: for "eager" members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.

Updated Assignor Logic

Although now Subscription has ownedPartitions field, the PartitionAssignor#subscription() would not require them to be filled since these fields will be always filled at the ConsumerCoordinator. We can, in fact, just let this function to only return the serialized user data bytes, and symmetrically let #onAssignment to only take the serialized user data bytes as well. But this is a bit out of the scope of this KIP and hence I'd omit it here.

The only logic that can (optionally) be updated, is the #assign() function where user instantiated implementation can leverage the "ownedPartitions" field to be a bit sticky.


Looking into the Future

The above upgrade path is admittedly not ideal, and to avoid such case to happen again, I'd propose to modify the JoinGroup protocol in this KIP as well to take the read `protocol version` from the PartitionAssignor.

Code Block
JoinGroupRequest (v5) => groupId SessionTimeout RebalanceTimeout memberId ProtocolType Protocols ProtocolVersion
   GroupId                 => String
   SessionTimeout          => Int32
   RebalanceTimeout        => Int32
   MemberId                => String
   ProtocolType            => String
   Protocols               => List<Protocol>
   ProtocolVersion         => Int32                   // new field

Protocol (v0) => ProtocolName ProtocolMetadata
   ProtocolName            => String
   ProtocolMetadata        => Bytes (consumer protocol encoded bytes)


And then on the broker side, when choosing the leader it will pick the one with the highest protocol version instead of picking it "first come first serve".

Although this change will not benefit the upgrade path at this time, in the future if we need to upgrade the assignor again, as long as they would not change the rebalance semantics (e.g. like we did in this KIP, from "eager" to "cooperative") we can actually use one rolling bounce instead since as long as there's one member on the newer version, that consumer will be picked.

For example, this can also help saving "version probing" cost on Streams as well: suppose we augment the join group schema with `protocol version` in Kafka version 2.3, and then with both brokers and clients being in version 2.3+, on the first rolling bounce where subscription and assignment schema and / or user metadata has changed, this protocol version will be bumped. On the broker side, when receiving all member's join-group request, it will choose the one that has the highest protocol version (also it assumes higher versioned protocol is always backward compatible, i.e. the coordinator can recognize lower versioned protocol as well) and select it as the leader. Then the leader can decide, based on its received and deserialized subscription information, how to assign partitions and how to encode the assignment accordingly so that everyone can understand it. With this, in Streams for example, no version probing would be needed since we are guaranteed the leader knows everyone's version -- again it is assuming that higher versioned protocol is always backward compatible -- and hence can successfully do the assignment at that round.


Edge Cases Discussion

This proposal depends on user's correct behavior that when upgrading to the new version everyone is still using "eager" protocol. But if user mades mistakes, it is still not going to fall into undefined behavior, as the assignor mechanism will guarantee that we must form a consensus on the protocol names, and whoever does not support the chosen one will be kicked out of the group and hence users would be notified about the mis-configure.

There's a few edge cases to illustrate this:

Changing protocol without changing assignor

If user mistakenly configured the rebalance.protocol to "cooperative" but did not change the assignor. Then if assignor does not support "cooperative" it will simply fail at starting up; if the assignor supports "cooperative" it will still have conflict with other members who are still on "eager", i.e no consensus assignor will be selected and members being kicked with a fatal error.

Downgrading and Old-Versioned New Member

If a consumer is downgraded incorrectly after the above upgrade path is complete: i.e. it just replaced with the old jar without changing any configs, it is treated as first leaving the group, and then rejoining the group as an new member with "eager". This situation can also be reflected when a new member with "eager" is joining a group (probably mistakenly) whereas everyone else have been switched to "cooperative". 

At the moment, no consensus protocol can be chosen with this member joining, and hence this member or everyone else will be kicked out of the group with a fatal error.

The right way to downgrade is first rolling bounce the instances while setting "rebalance.protocol" to "compatible", and then a second rolling bounce to set the rebalance protocol to "eager" if necessary.


Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Recommended Upgrade Procedure

As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is described above.

Rejected Alternatives

Another solution that we have discussed about, is to make each assignor only supports one protocol, i.e.:

Code Block
languagejava
interface PartitionAssignor {

    // existing interfaces

    short version();             // new API, the version of the assignor which indicate the user metadata / algorithmic difference.

    String name();

    RebalanceProtocol supportedProtocol();    // new API, indicate the ConsumerCoordinator protocol it would work with.
}


The existing built-in Assignor implementations will then be updated to:


Highest VersionPreferred ProtocolNotes
RangeAssignor0EagerCurrent default.
RoundRobinAssignor0Eager
StickAssignor (old)0Eager
StickAssignor (new)0CooperativeWill be new default in 3.0
StreamsAssignor (old)4Eager
StreamsAssignor (new)4Cooperative


Although it makes the upgrade path simpler since we would no longer need the "rebalance.protocol" config on consumer anymore, while just encoding multiple assignors during the first rolling bounce of the upgrade path, it requires duplicated assignor class (of course, the new class could just be extending from the old one and there's not much LOC duplicated) which is a bit cumbersome

Although this change will not benefit the upgrade path at this time, in the future if we need to change the rebalance protocol again, as long as we are not changing the rebalance semantics like we did in this KIP we can actually use one rolling bounce instead since as long as there's one member on the newer version, that consumer will be picked.

This can also help saving "version probing" cost on Streams as well: suppose we augment the join group schema with `protocol version` in Kafka version 2.3, and then with both brokers and clients being in version 2.3+, on the first rolling bounce where subscription and assignment schema and / or user metadata has changed, this protocol version will be bumped. On the broker side, when receiving all member's join-group request, it will choose the one that has the highest protocol version (also it assumes higher versioned protocol is always backward compatible, i.e. the coordinator can recognize lower versioned protocol as well) and select it as the leader. Then the leader can decide, based on its received and deserialized subscription information, how to assign partitions and how to encode the assignment accordingly so that everyone can understand it. With this, in Streams for example, no version probing would be needed since we are guaranteed the leader knows everyone's version -- again it is assuming that higher versioned protocol is always backward compatible -- and hence can successfully do the assignment at that round.

Edge Cases Discussion

There's a few edge cases worth mentioning here:

Downgrading and Old-Versioned New Member

If a consumer is downgraded after the above upgrade path is complete, it is treated as first leaving the group, and then rejoining the group as an new member with old V0. This situation can also be reflected when a new member with old version V0 is joining a team (probably mistakenly) that has been completely upgraded to V2.

At this moment everyone else will still get their existing assigned-partitions and the new comer would not get anything. However if another member left the group as well, then its partitions would not be assigned to anyone due to the logic 3) above.

Old-Versioned Member Become Leader

Since group coordinator would select new leaders within the existing member, even if the new leader has failed after the group has successfully upgraded the new leader should still be V1-aware, and new members of V0 joining within the same generation should not be selected. And with the protocol version we should be guaranteed to prefer higher-versioned member as leader as always.

Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Recommended Upgrade Procedure

As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is like:

  • Do not override the `rebalance.protocol` config when doing the first rolling bounce to upgrade the version, which will then be "eager" and hence consumers still revoke all partitions upon prepare-rebalance.
  • Doing another rolling bounce with `rebalance.protocol` config set to `cooperative`.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the doc in implementation plan.