Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The version is bumped to 8, and add a "generationId" field

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

{
  "apiKey": 11,
  "type": "request",
  "name": "JoinGroupRequest",
  // Version 1 adds RebalanceTimeoutMs.
  //
  // Version 2 and 3 are the same as version 1.
  //
  // Starting from version 4, the client needs to issue a second request to join group
  //
  // Starting from version 5, we add a new field called groupInstanceId to indicate member identity across restarts.
  // with assigned id.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is the same as version 6.

  // Starting from version 8, we add a new field called GenerationId to indicate if the member has out-of-date metadata.    <--- new added
  "validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },

         { "name": "GenerationId", "type": "int32", "versions": "8+""default": "-1", 
      "about": "The generation id of the member in the group." },   <--- new added
    { "name": "SessionTimeoutMs", "type": "int32", "versions": "0+",
      "about": "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds." },
    // Note: if RebalanceTimeoutMs is not present, SessionTimeoutMs should be
    // used instead.  The default of -1 here is just intended as a placeholder.
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
      "about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "GroupInstanceId", "type": "string", "versions": "5+",
      "nullableVersions": "5+", "default": "null",
      "about": "The unique identifier of the consumer instance provided by end user." },
    { "name": "ProtocolType", "type": "string", "versions": "0+",
      "about": "The unique name the for class of protocols implemented by the group we want to join." },
    { "name": "Protocols", "type": "[]JoinGroupRequestProtocol", "versions": "0+",
      "about": "The list of protocols that the member supports.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The protocol name." },
      { "name": "Metadata", "type": "bytes", "versions": "0+",
        "about": "The protocol metadata." }
    ]}
  ]
}


ConsumerProtocolSubscription

We'll updated consumer protocol for Subscription, which will bump the version to 2, add add a new field "generationgenerationId" at the end, and bump the version to V2.

{

  "type": "data",

  "name": "ConsumerProtocolSubscription",

  // Subscription part of the Consumer Protocol.

  //

  // The current implementation assumes that future versions will not break compatibility. When

  // it encounters a newer version, it parses it using the current format. This basically means

  // that new versions cannot remove or reorder any of the existing fields.

 // Starting from version 2, we add a new field called GenerationId to indicate if the member has out-of-date ownedPartitions.    <--- new added

  "validVersions": "0-2",

  "flexibleVersions": "none",

  "fields": [

    { "name": "Topics", "type": "[]string", "versions": "0+" },

    { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1"},  <-- new added

    { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",

      "default": "null", "zeroCopy": true },

    { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,

      "fields": [

        { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName" },

        { "name": "Partitions", "type": "[]int32", "versions": "1+"}

      ]

    }

    { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1"},  <-- new added

  ]

}


Also, there will also be a "generation" field added into the Subscription class in ConsumerPartitionAssignor.

...

--> This will work for the assignor only. But actually, in ConsumerCoordinator , after the cooperative assignor completes its assignment, we have a validation phase, to validate if the cooperative assignor revoke partitions first before assign it to other consumers. In the validation phase, we also need the "generation" info. If the generation info only put inside assignor, the validation phase can't leverage the "generation" data.

2. add "generation" field into Subscription:

Current consumer protocol for Subscription:

...

KafkaConsumer:
 
Subscription => TopicList UserData AssignedPartitions
   TopicList               => List<String>
   UserData                => Bytes  
   OwnedPartitions         => List<String, List<Int32>>

 

We'll updated consumer protocol for Subscription, which will add a new field "generation" at the end, and bump the version to V2.

KafkaConsumer:
 
Subscription => TopicList UserData AssignedPartitions Generation
   TopicList               => List<String>
   UserData                => Bytes  
   OwnedPartitions         => List<String, List<Int32>>
   Generation              => Int32   <--- new field

--> Reject this approach because the broker can't leverage the generation info inside subscription metadata.