Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeGroupsResult describeGroups(Collection<String> groupIds, DescribeGroupsOptions options);
 
	public DescribeGroupsResult describeGroups(Collection<String> groupIds) {
    	return describeGroups(groupIds, new DescribeGroupsOptions());
	}
 
    public abstract ListGroupsResult listGroups(ListGroupsOptions options);

    public ListGroupsResult listGroups() {
        return listGroups(new ListGroupsOptions());
    }

    public ListGroupsResult listGroups(ListGroupsOptions options) {
        //filtering groups by ConsumerProtocol.PROTOCOL_TYPE
    }

    public ListGroupsResult listGroups() {
        return listGroups(new ListGroupsOptions());
    }
    
    public ListGroupOffsetsResult listGroupOffsets(ListGroupsOptions options);
 
    public ListGroupOffsetsResult listGroupOffsets() {
        return listGroupOffsets(new ListGroupOffsetsOptions());
    }
}
 
public class DescribeGroupOptions extends AbstractOptions<DescribeGroupOptions> {
}
 
public class DescribeGroupResult {
    private final Map<String, KafkaFuture<GroupDescription>> futures;
    public DescribeGroupsResult(Map<String, KafkaFuture<GroupDescription>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<GroupDescription>> values() {
        return futures;
    }

    public KafkaFuture<Map<String, GroupDescription>> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
            thenApply(new KafkaFuture.Function<Void, Map<String, GroupDescription>>() {
                @Override
                public Map<String, GroupDescription> apply(Void v) {
                    Map<String, GroupDescription> descriptions = new HashMap<>(futures.size());
                    for (Map.Entry<String, KafkaFuture<GroupDescription>> entry : futures.entrySet()) {
                        try {
                            descriptions.put(entry.getKey(), entry.getValue().get());
                        } catch (InterruptedException | ExecutionException e) {
                            // This should be unreachable, because allOf ensured that all the futures
                            // completed successfully.
                            throw new RuntimeException(e);
                        }
                    }
                    return descriptions;
                }
            });
    }
}
 
public class GroupDescription {
    private final String groupId;
    private final String protocolType;
    private final List<ConsumerDescription> consumers;
}
 
public class MemberDescription {
    private final String consumerId;
    private final String clientId;
    private final String host;
    private final List<TopicPartition> assignment;
}

public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
}

public class ListGroupsResult {
    final KafkaFuture<Map<String, GroupListing>> future;

    ListGroupsResult(KafkaFuture<Map<String, GroupListing>> future) {
        this.future = future;
    }

    public KafkaFuture<Map<String, GroupListing>> namesToListings() //...
}
 
public class GroupListing {
    private final String  return futurename;
    }

private final   public KafkaFuture<Collection<GroupListing>> listings() {
        return future.thenApply(new KafkaFuture.Function<Map<String, GroupListing>, Collection<GroupListing>>() {
            @Override
            public Collection<GroupListing> apply(Map<String, GroupListing> namesToDescriptions) {
                return namesToDescriptions.values();
            }
        });
    }

    public KafkaFuture<Set<String>> names() {
        return future.thenApply(new KafkaFuture.Function<Map<String, GroupListing>, Set<String>>() {
            @Override
            public Set<String> apply(Map<String, GroupListing> namesToListings) {
                return namesToListings.keySet();
            }
        });
    }
}
String protocolType;
}
 
public class ListGroupOffsetsOptions extends AbstractOptions<ListGroupsOptions> {
}
 
public class GroupListingListGroupOffsetsResult {
    privatefinal finalKafkaFuture<Map<TopicPartition, StringLong>> namefuture;
 
    private final String protocolType;//...
}
 

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

...

  1. Add `#describeGroups(Collection<String> groupIds, DescribeGroupsOptions options)` and `#describeGroups(Collection<String> groupIds)` to `AdminClient` API 
  2. Add `#listGroups(ListGroupsOptions options)` and `#listGroups()` to `AdminClient` API and API 
  3. Add `#listGroupOffsets(ListGroupOffsetsOptions options)` and `#listGroupOffsets()` to `AdminClient` API and
  4. Implement it on `KafkaAdminClient`.
  5. Solve 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-6058

...