Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: typo's

Table of Contents

Status

Current state: "Draft" [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here 

JIRA: here

...

The number of L7 proxies supporting the Kafka protocol is extremely limited nowadays but there is an effort to bring support for the Kafka protocol in various proxies. Envoy has recently introduced support for the Kafka protocol [1][2], Linkerd has expressed its interest [3], and few people have implemented custom ones for specific use cases [4][5]. There is not no doubt that this trend will continue and that intercepting the Kafka protocol will become easier and more common.

...

In order to fill in the gaps which have identified in the Group API, we propose to explicitly put the `ProtocolType` and the `ProtocolName` in all the requests and responses which contain the `Metadata` and/or the `Assignment`. Namely, we propose to add it them in the `JoinGroupResponse`, the `SyncGroupRequest` and the `SyncGroupResponse`. See Public Interfaces for details.

With this information available, we propose to extend the `GroupCoordinator` and the `AbstractCoordinator` to verify it them and to error out if it is they are not consistent with the Protocol Type and the Protocol Name of the group. This is not strictly required but contribute to increasing the robustness of the Group API. For instance, it could help to catch errors when a new client in implemented or when changes are made in the `GroupCoordinator`.

In the `GroupCoordinator`, during the handling of the `SyncGroupRequest`, we propose to return the `INCONSISTENT_GROUP_PROTOCOL` error if the consumer type provided by the client do not correspond to the consumer type and/or name of the group. For older version, the verification would be omitted.

In the `AbstractCoordinator`, during the handling of the `JoinGroupResponse` and the `SyncGroupResponse`, we propose to fail the future with the `INCONSISTENT_GROUP_PROTOCOL` error if the consumer type and/or name received is consistent with the consumer type known. It would behave similarly to failing to parse the metadata or the assignment.

Public Interfaces

JoinGroupRequest

The version is bumped to 7 without changing the fields.

Code Block
languagejs
linenumberstrue
collapsetrue
{
  "apiKey": 11,
  "type": "request",
  "name": "JoinGroupRequest",
  // Version 1 adds RebalanceTimeoutMs.
  //
  // Version 2 and 3 are the same as version 1.
  //
  // Starting from version 4, the client needs to issue a second request to join group
  //
  // Starting from version 5, we add a new field called groupInstanceId to indicate member identity across restarts.
  // with assigned id.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is the same as version 6.
  "validVersions": "0-7",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "SessionTimeoutMs", "type": "int32", "versions": "0+",
      "about": "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds." },
    // Note: if RebalanceTimeoutMs is not present, SessionTimeoutMs should be
    // used instead.  The default of -1 here is just intended as a placeholder.
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
      "about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "GroupInstanceId", "type": "string", "versions": "5+", 
      "nullableVersions": "5+", "default": "null",
      "about": "The unique identifier of the consumer instance provided by end user." },
    { "name": "ProtocolType", "type": "string", "versions": "0+",
      "about": "The unique name the for class of protocols implemented by the group we want to join." },
    { "name": "Protocols", "type": "[]JoinGroupRequestProtocol", "versions": "0+",
      "about": "The list of protocols that the member supports.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The protocol name." },
      { "name": "Metadata", "type": "bytes", "versions": "0+",
        "about": "The protocol metadata." }
    ]}
  ]
}

JoinGroupResponse

The version is bumped to 7 and the `ProtocolType` field is added.

Code Block
languagejs
linenumberstrue
collapsetrue
{
  "apiKey": 11,
  "type": "response",
  "name": "JoinGroupResponse",
  // Version 1 is the same as version 0.
  //
  // Version 2 adds throttle time.
  //
  // Starting in version 3, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 4, the client needs to issue a second request to join group
  // with assigned id.
  //
  // Version 5 is bumped to apply group.instance.id to identify member across restarts.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 adds the Protocol Type.
  "validVersions": "0-7",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "GenerationId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The generation ID of the group." },
    // New Field
    { "name": "ProtocolType", "type": "string", "versions": "7+",
      "nullableVersions": "7+", "default": "null", "ignorable": true,
      "about": "The unique name the for class of protocols implemented by the group we want to join." },
    { "name"// Nullable from version 7
    { "name": "ProtocolName", "type": "string", "versions": "0+", "nullableVersions": "7+",
      "about": "The group protocol selected by the coordinator." },
    { "name": "Leader", "type": "string", "versions": "0+",
      "about": "The leader of the group." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member ID assigned by the group coordinator." },
    { "name": "Members", "type": "[]JoinGroupResponseMember", "versions": "0+", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The group member ID." },
      { "name": "GroupInstanceId", "type": "string", "versions": "5+",
        "nullableVersions": "5+", "default": "null",
        "about": "The unique identifier of the consumer instance provided by end user." },
      { "name": "Metadata", "type": "bytes", "versions": "0+",
        "about": "The group member metadata." }
    ]}
  ]
}

SyncGroupRequest

The version is bumped to 5 and the `ProtocolType` and `ProtocolName` fields are added.

Code Block
languagejs
linenumberstrue
collapsetrue
{
  "apiKey": 14,
  "type": "request",
  "name": "SyncGroupRequest",
  // Versions 1 and 2 are the same as version 0.
  //
  // Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the Protocol Type.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The unique group identifier." },
    { "name": "GenerationId", "type": "int32", "versions": "0+",
      "about": "The generation of the group." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member ID assigned by the group." },
    { "name": "GroupInstanceId", "type": "string", "versions": "3+", 
      "nullableVersions": "3+", "default": "null",
      "about": "The unique identifier of the consumer instance provided by end user." },
    // New Field
    { "name": "ProtocolType", "type": "string", "versions": "5+",
      "nullableVersions": "5+", "default": "null", "ignorable": true,
      "about": "The unique name the for class of protocols implemented by the group we want to join." },
    // New Field
    { "name": "AssignmentsProtocolName", "type": "[]SyncGroupRequestAssignmentstring", "versions": "05+",
      "aboutnullableVersions": "Each assignment.5+", "fieldsdefault": [
 "null", "ignorable": true,
      "about": "The group protocol name." },
    { "name": "MemberIdAssignments", "type": "string[]SyncGroupRequestAssignment", "versions": "0+",
        "about": "The ID of the member to assign." },Each assignment.", "fields": [
      { "name": "AssignmentMemberId", "type": "bytesstring", "versions": "0+",
        "about": "The member assignmentID of the member to assign." },
    ]}
  ]
}

SyncGroupResponse

Code Block
languagejs
linenumberstrue
collapsetrue
{
  "apiKeyname": 14"Assignment",
  "type": "responsebytes",
  "nameversions": "SyncGroupResponse0+",
  // Version 1 adds throttle  "about": "The member assignment." }
    ]}
  ]
}

SyncGroupResponse

The version is bumped to 5 and the `ProtocolType` and `ProtocolName` fields are added.

Code Block
languagejs
linenumberstrue
collapsetrue
{
  "apiKey": 14,
  "type": "response",
  "name": "SyncGroupResponse",
  // Version 1 adds throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting from version 3, syncGroupRequest supports a new field called groupInstanceId to indicate member identity across restarts.
  //
  // Version 4 is the first flexible versiontime.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting from version 3, syncGroupRequest supports a new field called groupInstanceId to indicate member identity across restarts.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the Protocol Type.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [ Version 5 adds the Protocol Type.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ThrottleTimeMsErrorCode", "type": "int32int16", "versions": "10+", "ignorable": true,
      "about": "The error durationcode, inor milliseconds0 forif whichthere thewas requestno was throttled due to a quota violation, or zero if the request did not violate any quota." }error." },
    // New Fields
    { "name": "ProtocolType", "type": "string", "versions": "5+",
    {  "namenullableVersions": "ErrorCode5+", "typedefault": "int16null", "versionsignorable": "0+"true,
      "about": "The error code, or 0 if there was no errorunique name the for class of protocols implemented by the group we want to join." },
    // New Field
    { "name": "ProtocolTypeProtocolName", "type": "string", "versions": "5+",
      "nullableVersions": "5+", "default": "null", "ignorable": true,
      "about": "The unique name the for class of protocols implemented by the group we want to joinprotocol name." },
    { "name": "Assignment", "type": "bytes", "versions": "0+",
      "about": "The member assignment." }
  ]
}

...