Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

KafkaConsumer:
 
Subscription => TopicList UserData AssignedPartitions
   TopicList               => List<String>
   UserData                => Bytes  
   OwnedPartitions         => List<String, List<Int32>>

 

Proposed Changes


Updated We'll updated consumer protocol for Subscription, which will add a new field "generation" at the end, and bump the version to V2.


KafkaConsumer:
 
Subscription => TopicList UserData AssignedPartitions Generation
   TopicList               => List<String>
   UserData                => Bytes  
   OwnedPartitions         => List<String, List<Int32>>
   Generation              => Int32   <--- new field




Also, there will also be a "generation" field added into the Subscription class in ConsumerPartitionAssignor.

Proposed Changes


So, during the joinGroup request, we'll include the "generation" data into the protocol set and send to the group coordinator. Later, when the consumer lead receive the subscription info from all consumers, it'll do the assignment based on the "ownedPartitions" and "generation" info. Also, after the assignment, we can also leverage the "ownedPartitions" and "generation" info to validate the assignments.

...

In the AbstractPartitionAssignor, we would have a validateSubscription function which takes in the ownedPartitions across all members, and needs to be called by all assignors (it is the customized assignor's own responsibility to call it), to check that ownedPartitions do not have overlaps.In the broker side, the group coordinator would check for the "generation" upon Join-Group: if it is a sentinel value (e.g. -1) then assume it is a new member that have never been in the group yet, and hence always set the current generation; if it is not sentinel value and stale, then return the ILLEGAL_GENERATION error directly. And, upon getting such error the member should not clear its member ID if there's one, but only reset the generation to null and also clean up the ownedPartitions before re-joining.


Compatibility, Deprecation, and Migration Plan

It is compatible to inject additional fields after the assignor-specific SubscriptionInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some existing fields before, and some new fields after the assignor-specific info bytes, and vice-versa on deserialization. So adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. 

Rejected Alternatives

Adding 1. Adding a new generation  method in ConsumerPartitionAssignor  interface, for the assignor to get the generation info.

--> This will work for the assignor only. But actually, in ConsumerCoordinator , after the cooperative assignor completes its assignment, we have a validation phase, to validate if the cooperative assignor revoke partitions first before assign it to other consumers. In the validation phase, we also need the "generation" info. If the generation info only put inside assignor, the validation phase can't leverage the "generation" data.



2. In the broker side, the group coordinator would check for the "generation" upon Join-Group: if it is a sentinel value (e.g. -1) then assume it is a new member that have never been in the group yet, and hence always set the current generation; if it is not sentinel value and stale, then return the ILLEGAL_GENERATION error directly. And, upon getting such error the member should not clear its member ID if there's one, but only reset the generation to null and also clean up the ownedPartitions before re-joining.

--> We don't deserialize the subscription data in broker side right now. We might need to add a "generation" field into join group request to achieve that, which is out of scope of this KIP.