Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize V0, but due to compatibility would still be able to deserialize V1 protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group.So under case 3), those members in V0 will have a small window where no partitions are assigned, and some partitions are not assigned to anyone as well. Hence, we will add a new ConsumerCoordinatorMetrics, which client will record upon receiving the assignment with the error code

Code Block
languagejava
titleProtocol Type
"num.incompatible.rebalance": "total number of rebalances that have failed due to incompatible members joining group at the same time."


group: "consumer-coordinator-metrics"
tags: client-id=([-.\w]+)
type: Count

The client, upon receiving the assignment associated with the error-code, would record the above metric, and backoff and re-send join-group request.

As for the upgrade path, we would require users to do two rolling bounces, where:

  1. In the first rolling bounce, keep the rebalance.protocol as "eager" (no need to manually change anything though since it is the default value).
  2. After the first rolling bounce is completely done. Then do a second rolling bounce in which rebalance.protocol is updated to "cooperative". The above logic will make sure that eventually when everyone's sending the join-group request with V1.

...


As for the upgrade path, we would require users to do two rolling bounces, where:

  1. In the first rolling bounce, keep the rebalance.protocol as "eager" (no need to manually change anything though since it is the default value).
  2. After the first rolling bounce is completely done. Then do a second rolling bounce in which rebalance.protocol is updated to "cooperative". The above logic will make sure that eventually when everyone's sending the join-group request with V1.


Note that this proposal depends on user's correct behavior that everyone should be on the same "rebalance protocol" eventually, otherwise the we would fall into the case 3.b) forever. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause undefined behavior since the old versioned leader may just proceed with the V0 "eager" assignment while some of the members are actually on V1.

Looking into the Future

The above upgrade path is admittedly not ideal, and to avoid such case to happen again, I'd propose to modify the JoinGroup protocol in this KIP as well:

Code Block
JoinGroupRequest (v5) => groupId SessionTimeout RebalanceTimeout memberId ProtocolType Protocols
   GroupId                 => String
   SessionTimeout          => Int32
   RebalanceTimeout        => Int32
   MemberId                => String
   ProtocolType            => String
   Protocols               => List<Protocol>
   ProtocolVersion         => Int32                   // new field

Protocol (v0) => ProtocolName ProtocolMetadata
   ProtocolName            => String
   ProtocolMetadata        => Bytes (consumer protocol encoded bytes)


And then on the broker side, when choosing the leader it will pick the one with the highest protocol version instead of picking it "first come first serve".

Again, this change will not benefit the upgrade path at this time, but in the future if we need to change the rebalance protocol again we can actually use one rolling bounce instead since as long as there's one member on the newer version, that consumer will be picked. This can also help saving "version probing" cost on Streams as well.


Edge Cases Discussion

There's a few edge cases worth mentioning here:

Downgrading and Old-Versioned New Member

If a consumer is downgraded after the above upgrade path is complete, it is treated as first leaving the group, and then rejoining the group as an new member with old V0. This situation can also be reflected when a new member with old version V0 is joining a team (probably mistakenly) that has been completely upgraded to V2. At this moment everyone else will still get their existing assigned-partitions and the new comer would not get anything. However if another member left the group as well, then its partitions would not be assigned to anyone due to the logic 3) above. We will rely on the above consumer-side metric so that users would be notified in time.

Old-Versioned Member Become Leader

Since group coordinator would select new leaders within the existing member, even if the new leader has failed after the group has successfully upgraded the new leader should still be V1-aware, and new members of V0 joining within the same generation should not be selected. And with the protocol version we should be guaranteed to prefer higher-versioned member as leader as always.


Compatibility, Deprecation, and Migration Plan

...