Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP is aim to add support for describing consumer groups and list consumer groups to `KafkaAdminClient` class. This functionality is required on the Streams Resetter Tool as describe here:
Public Interfaces
API
Add the following API in AdminClient:
public abstract class AdminClient implements AutoCloseable { public abstract DescribeGroupsResult describeGroups(Collection<String> groupIds, DescribeGroupsOptions options); public DescribeGroupsResult describeGroups(Collection<String> groupIds) { return describeGroups(groupIds, new DescribeGroupsOptions()); } public abstract ListGroupsResult listGroups(ListGroupsOptions options); public ListGroupsResult listGroups() { return listGroups(new ListGroupsOptions()); } public abstract ListGroupsResult listConsumerGroups(ListGroupsOptions options); //filtering groups by ConsumerProtocol.PROTOCOL_TYPE public ListGroupsResult listConsumerGroups() { return listGroups(new ListGroupsOptions()); } public ListGroupOffsetsResult listGroupOffsets(String groupId, ListGroupsOptions options); public ListGroupOffsetsResult listGroupOffsets(String groupId) { return listGroupOffsets(new ListGroupOffsetsOptions()); } } public class DescribeGroupOptions extends AbstractOptions<DescribeGroupOptions> { } public class DescribeGroupResult { private final Map<String, KafkaFuture<GroupDescription>> futures; ... } public class GroupDescription { private final String groupId; private final String protocolType; private final List<MemberDescription> members; } public class MemberDescription { private final String consumerId; private final String clientId; private final String host; private final Assignment assignment; } public class Assignment { private final List<TopicPartition> assignment; } public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> { } public class ListGroupsResult { final KafkaFuture<Map<String, GroupListing>> future; //... } public class GroupListing { private final String name; private final String protocolType; } public class ListGroupOffsetsOptions extends AbstractOptions<ListGroupsOptions> { } public class ListGroupOffsetsResult { final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future; //... }
This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.
Proposed Changes
- Add `#describeGroups(Collection<String> groupIds, DescribeGroupsOptions options)` and `#describeGroups(Collection<String> groupIds)` to `AdminClient` API
- Add `#describeConsumerGroups(Collection<String> groupIds, DescribeGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API
- Add `#listGroups(ListGroupsOptions options)` and `#listGroups()` to `AdminClient` API
- Add `#listGroupOffsets(String groupId, ListGroupOffsetsOptions options)` and `#listGroupOffsets(String groupId)` to `AdminClient` API and
- Implement it on `KafkaAdminClient`.
- Solve
Compatibility, Deprecation, and Migration Plan
- There won't be any impact on existing users.
- There won't be any change of current behavior.
- No migration tool required.
Rejected Alternatives
We are moving one more functionality from `core`'s `AdminClient` API to `clients`'s `AdminClient` API. This is aim to remove dependencies to `core` module.
Future Work
- Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.
References:
Pull Request: