...
The version is bumped to 8, and add a "generationId" field
1 |
|
ConsumerProtocolSubscription
We'll updated consumer protocol for Subscription, which will bump the version to 2, add add a new field "generationgenerationId" at the end, and bump the version to V2.
{ "type": "data", "name": "ConsumerProtocolSubscription", // Subscription part of the Consumer Protocol. // // The current implementation assumes that future versions will not break compatibility. When // it encounters a newer version, it parses it using the current format. This basically means // that new versions cannot remove or reorder any of the existing fields.
"validVersions": "0-2", "flexibleVersions": "none", "fields": [ { "name": "Topics", "type": "[]string", "versions": "0+" }, |
{ "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", "zeroCopy": true }, { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true, "fields": [ { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName" }, { "name": "Partitions", "type": "[]int32", "versions": "1+"} ] } { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1"}, <-- new added ] } |
Also, there will also be a "generation" field added into the Subscription
class in ConsumerPartitionAssignor.
...
--> This will work for the assignor only. But actually, in ConsumerCoordinator
, after the cooperative assignor completes its assignment, we have a validation phase, to validate if the cooperative assignor revoke partitions first before assign it to other consumers. In the validation phase, we also need the "generation" info. If the generation info only put inside assignor, the validation phase can't leverage the "generation" data.
2. add "generation" field into Subscription:
Current consumer protocol for Subscription:
...
KafkaConsumer:
Subscription => TopicList UserData AssignedPartitions
TopicList => List<String>
UserData => Bytes
OwnedPartitions => List<String, List<Int32>>
We'll updated consumer protocol for Subscription, which will add a new field "generation" at the end, and bump the version to V2.
KafkaConsumer:
Subscription => TopicList UserData AssignedPartitions Generation
TopicList => List<String>
UserData => Bytes
OwnedPartitions => List<String, List<Int32>>
Generation => Int32 <--- new field
--> Reject this approach because the broker can't leverage the generation info inside subscription metadata.