Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

KafkaConsumer:
 
Subscription => TopicList UserData AssignedPartitions
   TopicList               => List<String>
   UserData                => Bytes  
   OwnedPartitions         => List<String, List<Int32>>

 
Assignment => AssignedPartitions UserData
   OwnedPartitions         => List<String, List<Int32>>
   UserData                => Bytes


Proposed Changes

Updated consumer protocol for Subscription, which will add a new field "generation" at the end.

...

KafkaConsumer:
 
Subscription => TopicList UserData AssignedPartitions Generation
   TopicList               => List<String>
   UserData                => Bytes  
   OwnedPartitions         => List<String, List<Int32>>
   Generation              => Int32   <--- new field
 
Assignment => AssignedPartitions UserData
   OwnedPartitions         => List<String, List<Int32>>
   UserData                => Bytes



So, during the joinGroup request, we'll include the "generation" data into the protocol set and send to the group coordinator. Later, when the consumer lead receive all the subscription info from all consumers, it'll do the assignment based on the "ownedPartitions" and "generation" info. Also, after the assignment, we can also leverage the "ownedPartitions" and "generation" info to validate the assignments.


For built-in CooperativeStickyAssignor, if there are consumers in old bytecode and some in the new bytecode, it's totally fine, because the subscription data from old consumers will contain \[empty ownedPartitions + default generation(-1)] in V0, or \[current ownedPartitions + default generation(-1)] in V1. For V0 case, it's quite simple, because we'll just ignore the info since they are empty. For V1 case, we'll get the "ownedPartitions" data, and then check if there is generation info in the userData (i,e, current workaround). If so, we'll use it, otherwise, we'll take the ownedPartitions as default generation(-1).

For custom assignor, they can adopt the "ownedPartitions" and "generation" info together to check if it is a stale one. If they don't update their code, it's fine since this change is backward compatible. (check below)

Compatibility, Deprecation, and Migration Plan

...