Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 16,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "4+",
      "about": "The states of the groups we want to list. If empty all groups are returned with their state." },
    { "name": "TypesFilter", "type": "[]string", "versions": "5+",
      "about": "The types of the groups we want to list. If empty all groups are returned" }
  ]
}

Required ACL

  • Read Describe Group

Response Schema

The GroupType field is introduced. It represents the type of the group.

...

We propose to rename GenerationId to GenerationIdOrMemberEpoch.

Required ACL

  • Read Group

Request Handling

When the group id corresponds to a consumer group using the new rebalance protocol, the provided member epoch must match the expected member epoch.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 9,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "OffsetFetchRequest",
  // In version 0, the request read offsets from ZK.
  //
  // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
  //
  // Starting in version 2, the request can contain a null topics array to indicate that offsets
  // for all topics should be fetched. It also returns a top level error code
  // for group or coordinator level errors.
  //
  // Version 3, 4, and 5 are the same as version 2.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is adding the require stable flag.
  //
  // Version 8 is adding support for fetching offsets for multiple groups at a time
  //
  // Version 9 adds GenerationIdOrMemberEpoch and MemberId fields (KIP-848).
  "validVersions": "0-9",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
      "about": "The group to fetch offsets for." },
    // New fields.
    { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
      "about": "The generation of the group." },
    { "name": "MemberId", "type": "string", "versions": "9+", "ignorable": true,
      "about": "The member ID assigned by the group coordinator." },
    // End of new fields.
    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7",
      "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
        "about": "The topic name."},
      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
        "about": "The partition indexes we would like to fetch offsets for." }
    ]},
    { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
      "about": "Each group we would like to fetch offsets for", "fields": [
      { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
        "about": "The group ID."},
      { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
        "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
        { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
          "about": "The topic name."},
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
          "about": "The partition indexes we would like to fetch offsets for." }
      ]}
    ]},
    {"name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
      "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."}
  ]
}

Required ACL

  • Describe Group

Request Handling

The MemberId and the GenerationIdOrMemberEpoch are verified. FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID is returned.

...

Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

DescribeConfigs API

Request Schema

The API schema is the same but supports a new resource type: the ResourceType field can be set to GROUP (16). When GROUP is used, the resource name corresponds to the group id. 

Required ACL

  • Describe Config on the group.

Response Schema

No changes.

AlterIncrementalConfigs API

The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id.

Request Schema

The schema is the same but the ResourceType field can be set to GROUP (16).

Required ACL

  • Alter Config on the group.

Response Schema

No changes.

Records

This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.

...

New properties in the broker configuration.

Wether to enable the new group coordinator and the consumer group protocol.
NameTypeDefaultDocgroup.new.coordinator.enableboolfalse
group.coordinator.threadsint1The number of threads used to run the state machines.
group.consumer.session.timeout.msint30sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.min.session.timeout.msint45sThe minimum session timeout.
group.consumer.max.session.timeout.msint60sThe maximum session timeout.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.
group.consumer.min.heartbeat.interval.msint5sThe minimum heartbeat interval.
group.consumer.max.heartbeat.interval.msint15sThe maximum heartbeat interval.
group.consumer.max.sizeintMaxValueThe maximum number of consumers that a single consumer group can accommodate.
group.consumer.assignorsListrange, uniformThe server side assignors.

...