Table of Contents |
---|
Status
Current state: "Under DiscussionAccepted"
Discussion thread: here
JIRA: here Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6058
Released: 2.0.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP is aim to add support for describing consumer groups and list consumer groups to `KafkaAdminClient` class. This functionality is required on the Streams Resetter Tool as describe here: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-5965
Public Interfaces
API
Add the following API in AdminClient:
Code Block | ||
---|---|---|
| ||
public abstract class AdminClient implements AutoCloseable { public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options); public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) { return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions()); } public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options); public ListConsumerGroupsResult listConsumerGroups() { return listConsumerGroups(new ListConsumerGroupsOptions()); } } public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> { } public classabstract DescribeConsumerGroupResult { private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupsOptions options); public DescribeConsumerGroupsResult(Map<String, KafkaFuture<ConsumerGroupDescription>> futuresListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { this.futures = futuresreturn listConsumerGroupOffsets(new ListConsumerGroupOffsetsOptions()); } public Map<String,abstract KafkaFuture<ConsumerGroupDescription>>DeleteConsumerGroupsResult valuesdeleteConsumerGroups()Collection<String> { groupIds, return futures; } DeleteConsumerGroupsOptions options); public KafkaFuture<Map<String, ConsumerGroupDescription>> all(DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); } } public class DescribeConsumerGroupOptions thenApply(new KafkaFuture.Function<Void, Map<String, ConsumerGroupDescription>>() { @Override public Map<String, ConsumerGroupDescription> apply(Void v)extends AbstractOptions<DescribeConsumerGroupOptions> { } public class DescribeConsumerGroupResult { Map<StringMap<TopicPartition, ConsumerGroupDescription> descriptions = new HashMap<>(futures.sizeKafkaFuture<ConsumerGroupDescription>> all()); } public class ConsumerGroupDescription { /** * The id of the for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>> entry : futures.entrySet()) {consumer group. */ public String groupId(); /** * If consumer group is simple try {or not. */ public descriptions.put(entry.getKey(), entry.getValue().get());boolean isSimpleConsumerGroup(); /** * A list of the members of the consumer group. */ } catch (InterruptedException |public ExecutionExceptionCollection<MemberDescription> e) {members(); /** * The consumer group partition assignor. // This should be unreachable, because allOf ensured that all the futures*/ public String partitionAssignor(); /** * The consumer group state, or UNKNOWN if the state is too new for us to parse. // completed successfully.*/ public throw new RuntimeException(e); }ConsumerGroupState state(); }/** * The consumer group coordinator, or null if the coordinator is return descriptions;not known. }*/ public }Node coordinator(); } } public classenum ConsumerGroupDescriptionConsumerGroupState { private final String groupId;UNKNOWN, private final List<ConsumerDescription> consumers; } public class ConsumerDescription { private final String consumerId;PREPARING_REBALANCE, COMPLETING_REBALANCE, private final String clientId;STABLE, private final String host;DEAD, private final List<TopicPartition> assignmentEMPTY; } public class ListConsumerGroupsOptions extendsMemberDescription AbstractOptions<ListConsumerGroupsOptions> { } public class ListConsumerGroupsResult {/** final KafkaFuture<Map<String,* ConsumerGroupListing>> future; ListConsumerGroupsResult(KafkaFuture<Map<String, ConsumerGroupListing>> future) { The consumer id of the group member. */ public this.future = future; }String consumerId(); /** * ReturnThe aclient future which yields a map id of topicthe names to TopicListing objectsgroup member. */ public KafkaFuture<Map<String, ConsumerGroupListing>> namesToListings() { return future; } String clientId(); /** * ReturnThe ahost futurewhere whichthe yieldsgroup amember collection of TopicListing objectsis running. */ public KafkaFuture<Collection<ConsumerGroupListing>>String listingshost() {; return future.thenApply(new KafkaFuture.Function<Map<String, ConsumerGroupListing>, Collection<ConsumerGroupListing>>() {/** * The assignment of the group @Overridemember. */ public Collection<ConsumerGroupListing>MemberAssignment apply(Map<String, ConsumerGroupListing> namesToDescriptions)assignment(); } public class MemberAssignment { private final return namesToDescriptions.values(); } }); } /**List<TopicPartition> topicPartitions; } public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> { } public class ListConsumerGroupsResult { *public Return a future which yields a collection of topic names. */KafkaFuture<Collection<ConsumerGroupListing>> all(); public KafkaFuture<Set<String>>KafkaFuture<Collection<ConsumerGroupListing>> namesvalid() {; public return future.thenApply(new KafkaFuture.Function<Map<String, ConsumerGroupListing>, Set<String>>()KafkaFuture<Collection<Throwable>> errors(); } public class ConsumerGroupListing { private final @OverrideString groupId; private final boolean isSimpleConsumerGroup; } public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupsOptions> { } public Set<String> apply(Map<String, ConsumerGroupListing> namesToListings) class ListConsumerGroupOffsetsResult { KafkaFuture<Map<TopicPartition, return namesToListings.keySetOffsetAndMetadata>> all(); } }); } } public class DeleteConsumerGroupOptions extends AbstractOptions<DeleteConsumerGroupOptions> { } public class ConsumerGroupListingDeleteConsumerGroupsResult { privateMap<TopicPartition, final String nameKafkaFuture<Void>> all(); } |
This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.
Proposed Changes
- Add `#describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API API
- Add `#listConsumerGroups(ListConsumerGroupsOptions options)` and `#listConsumerGroups()` to `AdminClient` API and API
- Add `#listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)` and `#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and
- Implement it on `KafkaAdminClient`.
- Solve
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6058 - Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API based on KIP-229: DeleteGroups API.
Compatibility, Deprecation, and Migration Plan
- There won't be any impact on existing users.
- There won't be any change of current behavior.
- No migration tool required.
Rejected Alternatives
- We are moving one more functionality from `core`'s `AdminClient` API to `clients`'s `AdminClient` API. This is aim to remove dependencies to `core` module.
- Aggregating both Consumer Groups and Connect Groups in the same API would increase complexity adding generic MemberAssignment to match both groups. This KIP is only including Consumer Groups and will leave Connect Groups to be handled by another KIP. https://github.com/apache/kafka/pull/4454#issuecomment-360553277
Future Work
- Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.
References:
Pull Request: https://github.com/apache/kafka/pull/4454