Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Therefore, we should add the "generation" field into "Subscription" data of the consumer protocol. (i.e. ConsumerProtocolSubscription), to allow assignor/consumer coordinator/group coordinator to have a way to identify the out-of-date members/assignments.

Public Interfaces

JoinGroupRequest

The version is bumped to 8, and add a "generationId" field

...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

...

{
  "apiKey": 11,
  "type": "request",
  "name": "JoinGroupRequest",
  // Version 1 adds RebalanceTimeoutMs.
  //
  // Version 2 and 3 are the same as version 1.
  //
  // Starting from version 4, the client needs to issue a second request to join group
  //
  // Starting from version 5, we add a new field called groupInstanceId to indicate member identity across restarts.
  // with assigned id.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is the same as version 6.

  // Starting from version 8, we add a new field called GenerationId to indicate if the member has out-of-date metadata.    <--- new added
  "validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },

         { "name": "GenerationId", "type": "int32", "versions": "8+""default": "-1", 
      "about": "The generation id of the member in the group." },   <--- new added
    { "name": "SessionTimeoutMs", "type": "int32", "versions": "0+",
      "about": "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds." },
    // Note: if RebalanceTimeoutMs is not present, SessionTimeoutMs should be
    // used instead.  The default of -1 here is just intended as a placeholder.
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
      "about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "GroupInstanceId", "type": "string", "versions": "5+",
      "nullableVersions": "5+", "default": "null",
      "about": "The unique identifier of the consumer instance provided by end user." },
    { "name": "ProtocolType", "type": "string", "versions": "0+",
      "about": "The unique name the for class of protocols implemented by the group we want to join." },
    { "name": "Protocols", "type": "[]JoinGroupRequestProtocol", "versions": "0+",
      "about": "The list of protocols that the member supports.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The protocol name." },
      { "name": "Metadata", "type": "bytes", "versions": "0+",
        "about": "The protocol metadata." }
    ]}
  ]
}

ConsumerProtocolSubscription

...

In the AbstractPartitionAssignor, we would have a validateSubscription function which takes in the ownedPartitions across all members, and needs to be called by all assignors (it is the customized assignor's own responsibility to call it), to check that ownedPartitions do not have overlaps.

In the broker side, the group coordinator would check for the "generation" upon Join-Group: if it is a sentinel value (e.g. -1) then assume it is a new member that have never been in the group yet, and hence always set the current generation; if it is not sentinel value and stale, then return the ILLEGAL_GENERATION error directly. And, upon getting such error the member should not clear its member ID if there's one, but only reset the generation to null and also clean up the ownedPartitions before re-joining.

Compatibility, Deprecation, and Migration Plan

...

--> This will work for the assignor only. But actually, in ConsumerCoordinator , after the cooperative assignor completes its assignment, we have a validation phase, to validate if the cooperative assignor revoke partitions first before assign it to other consumers. In the validation phase, we also need the "generation" info. If the generation info only put inside assignor, the validation phase can't leverage the "generation" data.


2. in broker side, the group coordinator can check for the "generation" upon Join-Group: if it is a sentinel value (e.g. -1) then assume it is a new member that have never been in the group yet, and hence always set the current generation; if it is not sentinel value and stale, then return the ILLEGAL_GENERATION error directly. And, upon getting such error the member should not clear its member ID if there's one, but only reset the generation to null and also clean up the ownedPartitions before re-joining.

--> reject this suggestion because this way, the member needs 1 more round to join group. That is:


1. member-A joined group with generation 1 metadata (Subscription Info)

2. broker checked the generation ID, and found it's out-of-date (current generation id is 3), return ILLEGAL_GENERATION 

3. member-A got the ILLEGAL_GENERATION  error, clear the old ownedPartition and rejoin again

4. continue the rebalance


We can see, the purpose of step 1 ~ 3 is to clear the old metadata (i.e. ownedPartition) in the consumer side. However, we can identify the out-of-date ownedPartition via the SubscriptionInfo when doing assignment. This way, the out-of-date member doesn't need to rejoin group, and reduce the network traffic. ex:

1. member-A joined group with generation 1 metadata (Subscription Info)  (current generation id is 3)

2. continue the rebalance

3. consumer lead is performing assignment, parsing the subscription info, and ignore (or do other handling) the old generation ownedPartitions, and continue the assignment.

4. complete the assignment


This can still complete the assignment without any error, and reduce the network traffic between consumer/broker.