Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeGroupsResult describeGroups(Collection<String> groupIds, DescribeGroupsOptions options);
 
	public DescribeGroupsResult describeGroups(Collection<String> groupIds) {
    	return describeGroups(groupIds, new DescribeGroupsOptions());
	}
 
    public abstract ListGroupsResult listGroups(ListGroupsOptions options);

    public ListGroupsResult listGroups() {
        return listGroups(new ListGroupsOptions());
    }

    public abstract ListGroupsResult listGroupslistConsumerGroups(ListGroupsOptions options) {
        ; //filtering groups by ConsumerProtocol.PROTOCOL_TYPE
    }

    public ListGroupsResult listGroupslistConsumerGroups() {
        return listGroups(new ListGroupsOptions());
    }
    
    public ListGroupOffsetsResult listGroupOffsets(String groupId, ListGroupsOptions options);
 
    public ListGroupOffsetsResult listGroupOffsets(String groupId) {
        return listGroupOffsets(new ListGroupOffsetsOptions());
    }
}
 
public class DescribeGroupOptions extends AbstractOptions<DescribeGroupOptions> {
}
 
public class DescribeGroupResult {
    private final Map<String, KafkaFuture<GroupDescription>> futures;
    public DescribeGroupsResult(Map<String, KafkaFuture<GroupDescription>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<GroupDescription>> values() {
        return futures;
    }

    public KafkaFuture<Map<String, GroupDescription>> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
            thenApply(new KafkaFuture.Function<Void, Map<String, GroupDescription>>() {
                @Override
                public Map<String, GroupDescription> apply(Void v) {
                    Map<String, GroupDescription> descriptions = new HashMap<>(futures.size());
                    for (Map.Entry<String, KafkaFuture<GroupDescription>> entry : futures.entrySet()) {
                        try {
                            descriptions.put(entry.getKey(), entry.getValue().get());
                        } catch (InterruptedException | ExecutionException e) {
                            // This should be unreachable, because allOf ensured that all the futures
                            // completed successfully.
                            throw new RuntimeException(e);
                        }
                    }
                    return descriptions;
                }
            });
    }
}
 
public class GroupDescription {
    private final String groupId;
    private final String protocolType;
    private final List<ConsumerDescription>List<MemberDescription> consumersmembers;
}
 
public class MemberDescription {
    private final String consumerId;
    private final String clientId;
    private final String host;
    private final List<TopicPartition> assignment;
}

public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
}

public class ListGroupsResult {
    final KafkaFuture<Map<String, GroupListing>> future;

    //...
}
 
public class GroupListing {
    private final String name;
    private final String protocolType;
}
 
public class ListGroupOffsetsOptions extends AbstractOptions<ListGroupsOptions> {
}
 
public class ListGroupOffsetsResult {
    final KafkaFuture<Map<TopicPartition, Long>>OffsetAndMetadata>> future;
 
    //...
}
 

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

...

  1. Add `#describeGroups(Collection<String> groupIds, DescribeGroupsOptions options)` and `#describeGroups(Collection<String> groupIds)` to `AdminClient` API
  2. Add `#describeConsumerGroups(Collection<String> groupIds, DescribeGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API API
  3. Add `#listGroups(ListGroupsOptions options)` and `#listGroups()` to `AdminClient` API 
  4. Add `#listGroupOffsets(String groupId, ListGroupOffsetsOptions options)` and `#listGroupOffsets(String groupId)` to `AdminClient` API and
  5. Implement it on `KafkaAdminClient`.
  6. Solve 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-6058

...

  • Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.

References:

Pull Request: