Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussionAccepted

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9130

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Bump ListGroupsRequest version to 4 and include a new flexible field "statesStatesFilter".

Code Block
{
  "apiKey": 16,
  "type": "request",
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the States flexibleStatesFilter field (KIP-518).
  "validVersions": "0-4",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesStatesFilter", "type": "[]string", "versions": "4+", "tag": 0, "taggedVersions": "4+",
      "about": "The states of the groups we want to list", "fields": [
      { "name": "Name", "type": "string", "versions": "3+", "about": "The group state" }. If empty all groups are returned with their state."
    ]}
  ]
}

Bump ListGroupsResponse version to 4 and include a new flexible field "group_stateGroupState".

Code Block
{
  "apiKey": 16,
  "type": "response",
  "name": "ListGroupsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the GroupState flexible field (KIP-518).
  "validVersions": "0-4",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "Groups", "type": "[]ListedGroup", "versions": "0+",
      "about": "Each group in the response.", "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group ID." },
      { "name": "ProtocolType", "type": "string", "versions": "0+",
        "about": "The group protocol type." },
      { "name": "GroupState", "type": "string", "versions": "4+", "tag": 0, "taggedVersions": "4+", "ignorable": true,
        "about": "The group state stringname." }
    ]}
  ]
}


This KIP bumps the version of this request to avoid any version incompatibility issues with older broker. See rejected alternatives.

If States the version is not specified or if it's 4 or above, if StatesFilter is empty in the request all groups will be returned without with their states, like in versions < 4. If StatesFilter is set to a list of group states, only states that are currently in these states will be returned. At the moment, consumer group state is already serialized as a STRING in DescribeGroupsResponse, so serializing it the same way here.

When attempting to list groups with states using an older broker, an UnsupportedVersionException will be raised in the client.

...

Code Block
languagejava
titleListConsumerGroupsOptions
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {

    /**
     * Only groups in these states will be returned by listConsumerGroups()
     * If not set, all groups are returned without their states
     */
 throw   public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState>IllegalArgumentException if states) {}

    /**
     * All groups with their states will be returned by listConsumerGroups()is empty
     */
    public ListConsumerGroupsOptions inAnyStateinStates(Set<ConsumerGroupState> states) { }
 
 
    /**
     * Returns the list of States that are requested
     */
    public Set<ConsumerGroupState>Optional<Set<ConsumerGroupState>> states() { }
}

Similarly, a state field will be exposed in ConsumerGroupListing:

Code Block
public class ConsumerGroupListing {
    ...
    private final ConsumerGroupState groupState;

    /**
     * Consumer Group Statestate
     */
    public ConsumerGroupStateOptional<ConsumerGroupState> groupStatestate() {
        return groupStatestate;
    }
}

The state will be UNKNOWN empty when ListGroupsRequest < 3 is used, for example when connecting to an older broker.

...

No Format
# Existing behaviour
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
group1
group2

# Listing all groups with their states
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --state
GROUP          STATE
group1         stable
group2         emptyEmpty

# Listing groups in the stable state
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --statesstate stableStable
GROUP          STATE
group1         stableStable


Compatibility, Deprecation, and Migration Plan

...

With this KIP, having Describe authorization on the Cluster resource allows to retrieve the state of all groups. Previously this required having Describe authorization on each Group.There is nothing to deprecate or migrate.

Rejected Alternatives

  • Allow specifying a filter in ListGroups to retrieve groups by name or by state using wildcard
    • Unclear if it's a common use case
  • Add flexible fields without bumping the ListGroups version
    • Keeping v3 would allow cases where brokers could support v3 but not the new optional field. In that case, if a client specifies states in its request, brokers would still return the full list of groups without states. This looked confusing. In case broker don't support a feature, a UnsupportedVersionException is preferred.

...