Table of Contents |
---|
Status
Current state: "Under DiscussionAccepted"
Discussion thread: here
JIRA: here Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6058
Released: 2.0.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
public abstract class AdminClient implements AutoCloseable { public abstract DescribeGroupsResultDescribeConsumerGroupsResult describeGroupsdescribeConsumerGroups(Collection<String> groupIds, DescribeGroupsOptions DescribeConsumerGroupsOptions options); public DescribeGroupsResultDescribeConsumerGroupsResult describeGroupsdescribeConsumerGroups(Collection<String> groupIds) { return describeGroupsdescribeConsumerGroups(groupIds, new DescribeGroupsOptionsDescribeConsumerGroupsOptions()); } public abstract ListGroupsResultListConsumerGroupsResult listGroupslistConsumerGroups(ListGroupsOptionsListConsumerGroupsOptions options); public ListGroupsResultListConsumerGroupsResult listGroupslistConsumerGroups() { return listGroupslistConsumerGroups(new ListGroupsOptionsListConsumerGroupsOptions()); } public abstract ListGroupsResultListConsumerGroupOffsetsResult listGroups(ListGroupsOptionslistConsumerGroupOffsets(String groupId, ListConsumerGroupsOptions options) { //filtering groups by ConsumerProtocol.PROTOCOL_TYPE; } public ListGroupsResultListConsumerGroupOffsetsResult listGroupslistConsumerGroupOffsets(String groupId) { return listGroupslistConsumerGroupOffsets(new ListGroupsOptionsListConsumerGroupOffsetsOptions()); } } public class DescribeGroupOptions extends AbstractOptions<DescribeGroupOptions> { } public classabstract DescribeGroupResult { private final Map<String, KafkaFuture<GroupDescription>> futures;DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options); public DescribeGroupsResult(Map<String, KafkaFuture<GroupDescription>> futuresDeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) { this.futures = futures return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); } } public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> { } public Map<String,class KafkaFuture<GroupDescription>>DescribeConsumerGroupResult values() { Map<TopicPartition, return futuresKafkaFuture<ConsumerGroupDescription>> all(); } public class ConsumerGroupDescription { public KafkaFuture<Map<String, GroupDescription>> all() { /** * The id of return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))the consumer group. */ public String thenApply(new KafkaFuture.Function<Void, Map<String, GroupDescription>>() {groupId(); /** * If consumer group is simple or @Overridenot. */ public boolean isSimpleConsumerGroup(); public Map<String, GroupDescription> apply(Void v) {/** * A list of the members of the consumer group. */ Map<String, GroupDescription> descriptions =public new HashMap<>(futures.sizeCollection<MemberDescription> members()); /** * The consumer group partition assignor. */ for (Map.Entry<String, KafkaFuture<GroupDescription>> entrypublic :String futures.entrySetpartitionAssignor()) {; /** * The consumer group state, or UNKNOWN if the state is too new for us tryto {parse. */ public descriptions.put(entry.getKey(), entry.getValue().get()); ConsumerGroupState state(); /** * The consumer group coordinator, or null if the coordinator is not known. } catch (InterruptedException | ExecutionException e) { */ public Node coordinator(); } public enum ConsumerGroupState { UNKNOWN, PREPARING_REBALANCE, COMPLETING_REBALANCE, // This should be unreachable, because allOf ensured that all the futuresSTABLE, DEAD, EMPTY; } public class MemberDescription { /** * The consumer id of the group member. // completed successfully.*/ public throw new RuntimeException(eString consumerId(); /** * The client id of the group member. }*/ public String clientId(); }/** * The host where the group member return descriptions;is running. }*/ public }String host(); } } public class GroupDescription { private final String groupId;/** * The assignment of the group member. private final String protocolType; */ private finalpublic List<ConsumerDescription>MemberAssignment consumersassignment(); } public class MemberDescriptionMemberAssignment { private final String consumerId; private final String clientId; private final String host; private final List<TopicPartition> assignmentList<TopicPartition> topicPartitions; } public class ListGroupsOptionsListConsumerGroupsOptions extends AbstractOptions<ListGroupsOptions>AbstractOptions<ListConsumerGroupsOptions> { } public class ListGroupsResultListConsumerGroupsResult { finalpublic KafkaFuture<Map<String, GroupListing>> future; ListGroupsResult(KafkaFuture<Map<String, GroupListing>> future) { this.future = futureKafkaFuture<Collection<ConsumerGroupListing>> all(); } public KafkaFuture<Map<String, GroupListing>> namesToListings() { return future; } public KafkaFuture<Collection<GroupListing>> listings() { return future.thenApply(new KafkaFuture.Function<Map<String, GroupListing>, Collection<GroupListing>>() { @Override public Collection<GroupListing> apply(Map<String, GroupListing> namesToDescriptions)KafkaFuture<Collection<ConsumerGroupListing>> valid(); public KafkaFuture<Collection<Throwable>> errors(); } public class ConsumerGroupListing { private final return namesToDescriptions.values()String groupId; private final } }); } public KafkaFuture<Set<String>> names() { return future.thenApply(new KafkaFuture.Function<Map<String, GroupListing>, Set<String>>() { @Override public Set<String> apply(Map<String, GroupListing> namesToListings) boolean isSimpleConsumerGroup; } public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupsOptions> { } public class ListConsumerGroupOffsetsResult { KafkaFuture<Map<TopicPartition, return namesToListings.keySetOffsetAndMetadata>> all(); } public class DeleteConsumerGroupOptions extends }); }AbstractOptions<DeleteConsumerGroupOptions> { } public class GroupListingDeleteConsumerGroupsResult { privateMap<TopicPartition, final String name; private final String protocolTypeKafkaFuture<Void>> all(); } |
This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.
Proposed Changes
- Add `#describeGroups`#describeConsumerGroups(Collection<String> groupIds, DescribeGroupsOptions DescribeConsumerGroupsOptions options)` and `#describeGroups`#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API
- Add `#listConsumerGroups(ListConsumerGroupsOptions options)` and `#listConsumerGroups()` to `AdminClient` API
- Add `#listGroups(ListGroupsOptions `#listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)` and `#listGroups`#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and and
- Implement it on `KafkaAdminClient`.
- Solve
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6058 - Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API based on KIP-229: DeleteGroups API.
Compatibility, Deprecation, and Migration Plan
- There won't be any impact on existing users.
- There won't be any change of current behavior.
- No migration tool required.
Rejected Alternatives
- We are moving one more functionality from `core`'s `AdminClient` API to `clients`'s `AdminClient` API. This is aim to remove dependencies to `core` module.
- Aggregating both Consumer Groups and Connect Groups in the same API would increase complexity adding generic MemberAssignment to match both groups. This KIP is only including Consumer Groups and will leave Connect Groups to be handled by another KIP. https://github.com/apache/kafka/pull/4454#issuecomment-360553277
Future Work
- Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.
References:
Pull Request: https://github.com/apache/kafka/pull/4454