THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
JIRA: here Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6058
Released: 2.0.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
public abstract class AdminClient implements AutoCloseable { public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options); public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) { return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions()); } public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options); public ListConsumerGroupsResult listConsumerGroups() { return listConsumerGroups(new ListConsumerGroupsOptions()); } public abstract ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupsOptions options); public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { return listConsumerGroupOffsets(new ListConsumerGroupOffsetsOptions()); } public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options); public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) { return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); } } public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> { } public class DescribeConsumerGroupResult { private final Map<StringMap<TopicPartition, KafkaFuture<ConsumerGroupDescription>> futures; ...all(); } public class ConsumerGroupDescription { /** * The id of the consumer group. private final */ public String groupId(); /** * If consumer group is simple or not. */ private finalpublic boolean isSimpleConsumerGroup(); private final List<MemberDescription> members; private final String partitionAssignor; } /** * A list of the members of the consumer group. */ public Collection<MemberDescription> members(); /** * The consumer group partition assignor. */ public String partitionAssignor(); /** * The consumer group state, or UNKNOWN if the state is too new for us to parse. */ public ConsumerGroupState state(); /** * The consumer group coordinator, or null if the coordinator is not known. */ public Node coordinator(); } public enum ConsumerGroupState { UNKNOWN, PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY; } public class MemberDescription { /** * The consumer id of the group member. private final */ public String consumerId(); /** private final* The client id of the group member. */ public String clientId(); /** * The host where the group member is running. private final*/ public String host(); /** * The assignment of the group member. */ private finalpublic MemberAssignment assignment(); } public class MemberAssignment { private final List<TopicPartition> topicPartitions; } public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> { } public class ListConsumerGroupsResult { finalpublic KafkaFuture<Map<String, GroupListing>> futureKafkaFuture<Collection<ConsumerGroupListing>> all(); public //... }KafkaFuture<Collection<ConsumerGroupListing>> valid(); public KafkaFuture<Collection<Throwable>> errors(); } public class ConsumerGroupListing { private final String groupId; private final boolean isSimpleConsumerGroup; } public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupsOptions> { } public class ListConsumerGroupOffsetsResult { final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> futureall(); } public class DeleteConsumerGroupOptions extends AbstractOptions<DeleteConsumerGroupOptions> //...{ } public class DeleteConsumerGroupsResult { Map<TopicPartition, KafkaFuture<Void>> all(); } |
This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.
...
- Add `#describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API
- Add `#listConsumerGroups(ListConsumerGroupsOptions options)` and `#listConsumerGroups()` to `AdminClient` API
- Add `#listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)` and `#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and
- Implement it on `KafkaAdminClient`.
- Solve
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6058 - Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API once based on KIP-229: DeleteGroups API is merged.
Compatibility, Deprecation, and Migration Plan
...