THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public abstract class AdminClient implements AutoCloseable { public abstract DescribeGroupsResult describeGroups(Collection<String> groupIds, DescribeGroupsOptions options); public DescribeGroupsResult describeGroups(Collection<String> groupIds) { return describeGroups(groupIds, new DescribeGroupsOptions()); } public abstract ListGroupsResult listGroups(ListGroupsOptions options); public ListGroupsResult listGroups() { return listGroups(new ListGroupsOptions()); } public ListGroupsResult listGroups(ListGroupsOptions options) { //filtering groups by ConsumerProtocol.PROTOCOL_TYPE } public ListGroupsResult listGroups() { return listGroups(new ListGroupsOptions()); } public ListGroupOffsetsResult listGroupOffsets(ListGroupsOptions options); public ListGroupOffsetsResult listGroupOffsets() { return listGroupOffsets(new ListGroupOffsetsOptions()); } } public class DescribeGroupOptions extends AbstractOptions<DescribeGroupOptions> { } public class DescribeGroupResult { private final Map<String, KafkaFuture<GroupDescription>> futures; public DescribeGroupsResult(Map<String, KafkaFuture<GroupDescription>> futures) { this.futures = futures; } public Map<String, KafkaFuture<GroupDescription>> values() { return futures; } public KafkaFuture<Map<String, GroupDescription>> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). thenApply(new KafkaFuture.Function<Void, Map<String, GroupDescription>>() { @Override public Map<String, GroupDescription> apply(Void v) { Map<String, GroupDescription> descriptions = new HashMap<>(futures.size()); for (Map.Entry<String, KafkaFuture<GroupDescription>> entry : futures.entrySet()) { try { descriptions.put(entry.getKey(), entry.getValue().get()); } catch (InterruptedException | ExecutionException e) { // This should be unreachable, because allOf ensured that all the futures // completed successfully. throw new RuntimeException(e); } } return descriptions; } }); } } public class GroupDescription { private final String groupId; private final String protocolType; private final List<ConsumerDescription> consumers; } public class MemberDescription { private final String consumerId; private final String clientId; private final String host; private final List<TopicPartition> assignment; } public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> { } public class ListGroupsResult { final KafkaFuture<Map<String, GroupListing>> future; ListGroupsResult(KafkaFuture<Map<String, GroupListing>> future) { this.future = future; } public KafkaFuture<Map<String, GroupListing>> namesToListings() //... } public class GroupListing { private final String return futurename; } private final public KafkaFuture<Collection<GroupListing>> listings() { return future.thenApply(new KafkaFuture.Function<Map<String, GroupListing>, Collection<GroupListing>>() { @Override public Collection<GroupListing> apply(Map<String, GroupListing> namesToDescriptions) { return namesToDescriptions.values(); } }); } public KafkaFuture<Set<String>> names() { return future.thenApply(new KafkaFuture.Function<Map<String, GroupListing>, Set<String>>() { @Override public Set<String> apply(Map<String, GroupListing> namesToListings) { return namesToListings.keySet(); } }); } } String protocolType; } public class ListGroupOffsetsOptions extends AbstractOptions<ListGroupsOptions> { } public class GroupListingListGroupOffsetsResult { privatefinal finalKafkaFuture<Map<TopicPartition, StringLong>> namefuture; private final String protocolType;//... } |
This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.
...
- Add `#describeGroups(Collection<String> groupIds, DescribeGroupsOptions options)` and `#describeGroups(Collection<String> groupIds)` to `AdminClient` API
- Add `#listGroups(ListGroupsOptions options)` and `#listGroups()` to `AdminClient` API and API
- Add `#listGroupOffsets(ListGroupOffsetsOptions options)` and `#listGroupOffsets()` to `AdminClient` API and
- Implement it on `KafkaAdminClient`.
- Solve
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6058
...