Status
Current state: "Accepted"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP is aim to add support for describing consumer groups and list consumer groups to `KafkaAdminClient` class. This functionality is required on the Streams Resetter Tool as describe here:
Public Interfaces
API
Add the following API in AdminClient:
public abstract class AdminClient implements AutoCloseable { public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options); public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) { return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions()); } public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options); public ListConsumerGroupsResult listConsumerGroups() { return listConsumerGroups(new ListConsumerGroupsOptions()); } public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupsOptions options); public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { return listConsumerGroupOffsets(new ListConsumerGroupOffsetsOptions()); } public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options); public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) { return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); } } public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> { } public class DescribeConsumerGroupResult { private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures; ... } public class ConsumerGroupDescription { private final String groupId; private final boolean isSimpleConsumerGroup; private final List<MemberDescription> members; private final String partitionAssignor; } public class MemberDescription { private final String consumerId; private final String clientId; private final String host; private final MemberAssignment assignment; } public class MemberAssignment { private final List<TopicPartition> topicPartitions; } public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> { } public class ListConsumerGroupsResult { final KafkaFuture<Map<String, GroupListing>> future; //... } public class ConsumerGroupListing { private final boolean isSimpleConsumerGroup; } public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupsOptions> { } public class ListConsumerGroupOffsetsResult { final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future; //... } public class DeleteConsumerGroupOptions extends AbstractOptions<DeleteConsumerGroupOptions> { } public class DeleteConsumerGroupsResult { final Map<String, KafkaFuture<Void>> future; }
This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.
Proposed Changes
- Add `#describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API
- Add `#listConsumerGroups(ListConsumerGroupsOptions options)` and `#listConsumerGroups()` to `AdminClient` API
- Add `#listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)` and `#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and
- Implement it on `KafkaAdminClient`.
- Solve
- Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API based on KIP-229: DeleteGroups API.
Compatibility, Deprecation, and Migration Plan
- There won't be any impact on existing users.
- There won't be any change of current behavior.
- No migration tool required.
Rejected Alternatives
- We are moving one more functionality from `core`'s `AdminClient` API to `clients`'s `AdminClient` API. This is aim to remove dependencies to `core` module.
- Aggregating both Consumer Groups and Connect Groups in the same API would increase complexity adding generic MemberAssignment to match both groups. This KIP is only including Consumer Groups and will leave Connect Groups to be handled by another KIP. https://github.com/apache/kafka/pull/4454#issuecomment-360553277
Future Work
- Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.
References:
Pull Request: https://github.com/apache/kafka/pull/4454