...
Therefore, we should add the "generation" field into "Subscription" data of the consumer protocol. (i.e. ConsumerProtocolSubscription), to allow assignor/consumer coordinator/group coordinator to have a way to identify the out-of-date members/assignments.
Public Interfaces
Current consumer protocol for Subscription:
JoinGroupRequest
The version is bumped to 8, and add a "generationId" field
1 |
|
ConsumerProtocolSubscription
...
KafkaConsumer:
Subscription => TopicList UserData AssignedPartitions
TopicList => List<String>
UserData => Bytes
OwnedPartitions => List<String, List<Int32>>
We'll updated consumer protocol for Subscription, which will add a new field "generation" at the end, and bump the version to V2.
{ "type": "data", "name": "ConsumerProtocolSubscription", // Subscription part of the Consumer Protocol. // // The current implementation assumes that future versions will not break compatibility. When // it encounters a newer version, it parses it using the current format. This basically means // that new versions cannot remove or reorder any of the existing fields.
"validVersions": "0-2", "flexibleVersions": "none", "fields": [ { "name": "Topics", "type": "[]string", "versions": "0+" }, { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1"}, <-- new added { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", "zeroCopy": true }, { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true, "fields": [ { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName" }, { "name": "Partitions", "type": "[]int32", "versions": "1+"} ] } ] } |
Also, there will also be a "generation" field added into the Subscription
class in ConsumerPartitionAssignor.
...
In the AbstractPartitionAssignor, we would have a validateSubscription
function which takes in the ownedPartitions across all members, and needs to be called by all assignors (it is the customized assignor's own responsibility to call it), to check that ownedPartitions do not have overlaps.
In the broker side, the group coordinator would check for the "generation" upon Join-Group: if it is a sentinel value (e.g. -1) then assume it is a new member that have never been in the group yet, and hence always set the current generation; if it is not sentinel value and stale, then return the ILLEGAL_GENERATION error directly. And, upon getting such error the member should not clear its member ID if there's one, but only reset the generation to null and also clean up the ownedPartitions before re-joining.
Compatibility, Deprecation, and Migration Plan
...
--> This will work for the assignor only. But actually, in ConsumerCoordinator
, after the cooperative assignor completes its assignment, we have a validation phase, to validate if the cooperative assignor revoke partitions first before assign it to other consumers. In the validation phase, we also need the "generation" info. If the generation info only put inside assignor, the validation phase can't leverage the "generation" data.
2. In the broker side, the group coordinator would check for the "generation" upon Join-Group: if it is a sentinel value (e.g. -1) then assume it is a new member that have never been in the group yet, and hence always set the current generation; if it is not sentinel value and stale, then return the ILLEGAL_GENERATION error directly. And, upon getting such error the member should not clear its member ID if there's one, but only reset the generation to null and also clean up the ownedPartitions before re-joining.add "generation" field into Subscription:
Current consumer protocol for Subscription:
|
We'll updated consumer protocol for Subscription, which will add a new field "generation" at the end, and bump the version to V2.
|
--> Reject this approach because the broker can't leverage the generation info inside subscription metadata--> We don't deserialize the subscription data in broker side right now. We might need to add a "generation" field into join group request to achieve that, which is out of scope of this KIP.