Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
KafkaConsumer:

Subscription => TopicList AssignedPartitions SubscriptionInfo
   TopicList               => List<String>
   AssignedPartitionsSubscriptionInfo        => List<TopicPartition>  Bytes // can be null
   SubscriptionInfoAssignedPartitions      => List<TopicPartition> => Bytes // new field


Assignment = AssignedPartitions RevokedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   AssignmentInfo          => Bytes
   RevokedPartitions       => List<TopicPartition>   // cannew be nullfield
   ErrorCode               => Int16
         AssignmentInfo         // =>new Bytesfield


Note that it is compatible to inject additional fields before after the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some fields before, and some fields (a.k.a. those new fields) after the assignor-specific info bytes, and vice-versa on deserialization. So as long as we always keep the metadata bytes at the end of the protocol it is adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.


Consumer Coordinator Algorithm

...

Note that since we are injecting additional fields in at the middle end of the consumer protocol to keep the assignor-field "info" at the end, the new protocol would not still be compatible with the old version. That means, an old-versioned consumer would still be able to deserialize a newer-versioned protocol data (as long as we only append new fields at the end, this would be the case).

However, when consumers with V1 is joining the group, there's a key behavioral difference that they would NOT revoke their partitions, and hence it is not safe to re-assign any of their partitions as we did in the current (V0) assignment logic. That means, the leader can only proceed the assignment when it knew that all the members are either on V0, or V1 versions

Another thing to keep in mind that, if the leader itself is still on older version, it would still be able to deserialize the V1 subscription protocol as V0, by ignoring the additional fields, and hence it may "think" everyone is still on V0, while some of them may actually be on the newer version.


be able to deserialize a newer-versioned protocol data at all. Therefore when upgrading we need the new consumer byte-code to first still following the old versioned protocol for both metadata encoding, as well as the behavior (e.g. still revoking before send JoinGroup). And after everyone have upgraded to the new byte-code, we can allow them to start rebalancing with the new versioned protocol. Note that during the later rebalance, it is still possible that consumers will send join-group request with old version (but the key here is that they are all new-version aware), in which case consumer leader can freely adjust its logic based on the aggregated versions. More specifically, we introduce the following new config to Consumer:

...

Note that this proposal depends on user's correct behavior that everyone should be on the same "rebalance protocol" eventually, otherwise the we would fall into the case 3.b) forever where some partitions would not be assigned to anyone. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause consumers to crash badly due to deserialization failures. undefined behavior since the old versioned leader may just proceed with the V0 "eager" assignment while some of the members are actually on V1.


There's a few edge cases worth mentioning here:

...

Since group coordinator would select new leaders within the existing member, even if the new leader has failed after the group has successfully upgraded the new leader should still be V1-aware, and new members of V0 joining within the same generation should not be selected at all. So this should not be an issue.NOTE: We will add a consumer metadata length field for subscription / assignment in this version so that in the future, old versioned consumer byte code can still successfully deserialize new versioned protocols. However for this upgrade path, we do not have the luxury.


-------------------------------

...