Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Bump ListGroupsRequest version to 4 and include a new flexible field "statesStatesFilter".

Code Block
{
  "apiKey": 16,
  "type": "request",
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the States flexibleStatesFilter field (KIP-518).
  "validVersions": "0-4",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesStatesFilter", "type": "[]string", "versions": "4+", "tag": 0, "taggedVersions": "4+",
      "about": "The states of the groups we want to list", "fields": [
      { "name": "Name", "type": "string", "versions": "3+", "about": "The group state" }. If empty all groups are returned with their state."
    ]}
  ]
}

Bump ListGroupsResponse version to 4 and include a new flexible field "group_stateGroupState".

Code Block
{
  "apiKey": 16,
  "type": "response",
  "name": "ListGroupsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the GroupState flexible field (KIP-518).
  "validVersions": "0-4",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "Groups", "type": "[]ListedGroup", "versions": "0+",
      "about": "Each group in the response.", "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group ID." },
      { "name": "ProtocolType", "type": "string", "versions": "0+",
        "about": "The group protocol type." },
      { "name": "GroupState", "type": "string", "versions": "4+", "tag": 0, "taggedVersions": "4+", "ignorable": true,
        "about": "The group state stringname." }
    ]}
  ]
}


This KIP bumps the version of this request to avoid any version incompatibility issues with older broker. See rejected alternatives.

If States the version is not specified or if it's 4 or above, if StatesFilter is empty in the request all groups will be returned without their states, like in versions < 4with their states. If StatesFilter is set to a list of group states, only states that are currently in these states will be returned. At the moment, consumer group state is already serialized as a STRING in DescribeGroupsResponse, so serializing it the same way here.

When attempting to list groups with states using an older broker, an UnsupportedVersionException will be raised in the client.

...

Code Block
languagejava
titleListConsumerGroupsOptions
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {

    /**
     * Only groups in these states will be returned by listConsumerGroups()
     * If not set, all groups are returned without their states
     * throw IllegalArgumentException if states is empty
     */
    public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { }
 
    /**
     * All groups with their states will be returned by listConsumerGroups()
     */
    public ListConsumerGroupsOptions inAnyState() { }
 
    /**
     * Returns the list of States that are requested
     */
    public Optional<Set<ConsumerGroupState>> states() { }
}

...

Code Block
public class ConsumerGroupListing {
    ...
    private final ConsumerGroupState groupState;

    /**
     * Consumer Group Statestate
     */
    public ConsumerGroupStateOptional<ConsumerGroupState> groupStatestate() {
        return groupStatestate;
    }
}

The state will be UNKNOWN empty when ListGroupsRequest < 3 is used, for example when connecting to an older broker.

...

No Format
# Existing behaviour
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
group1
group2

# Listing all groups with their states
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --state
GROUP          STATE
group1         stable
group2         emptyEmpty

# Listing groups in the stable state
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --state stableStable
GROUP          STATE
group1         stableStable


Compatibility, Deprecation, and Migration Plan

...

With this KIP, having Describe authorization on the Cluster resource allows to retrieve the state of all groups. Previously this required having Describe authorization on each Group.There is nothing to deprecate or migrate.

Rejected Alternatives

  • Allow specifying a filter in ListGroups to retrieve groups by name or by state using wildcard
    • Unclear if it's a common use case
  • Add flexible fields without bumping the ListGroups version
    • Keeping v3 would allow cases where brokers could support v3 but not the new optional field. In that case, if a client specifies states in its request, brokers would still return the full list of groups without states. This looked confusing. In case broker don't support a feature, a UnsupportedVersionException is preferred.

...