Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options);
 
	public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
    	return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
	}
 
    public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

    public ListConsumerGroupsResult listConsumerGroups() {
        return listConsumerGroups(new ListConsumerGroupsOptions());
    }
}
 
public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> {
}
 
public class DescribeConsumerGroupResult {
    private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;
    public DescribeConsumerGroupsResult(Map<String, KafkaFuture<ConsumerGroupDescription>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<ConsumerGroupDescription>> values() {
        return futures;
    }

    public KafkaFuture<Map<String, ConsumerGroupDescription>> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
            thenApply(new KafkaFuture.Function<Void, Map<String, ConsumerGroupDescription>>() {
                @Override
                public Map<String, ConsumerGroupDescription> apply(Void v) {
                    Map<String, ConsumerGroupDescription> descriptions = new HashMap<>(futures.size());
                    for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>> entry : futures.entrySet()) {
                        try {
                            descriptions.put(entry.getKey(), entry.getValue().get());
                        } catch (InterruptedException | ExecutionException e) {
                            // This should be unreachable, because allOf ensured that all the futures
                            // completed successfully.
                            throw new RuntimeException(e);
                        }
                    }
                    return descriptions;
                }
            });
    }
}
 
public class ConsumerGroupDescription {
    private final String namegroupId;
    private final List<ConsumerDescription> consumers;
}
 
public class ConsumerDescription {
    private final String consumerId;
    private final String clientId;
    private final String host;
    private final List<TopicPartition> assignment;
}
 
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}

public class ListConsumerGroupsResult {
    final KafkaFuture<Map<String, ConsumerGroupListing>> future;

    ListConsumerGroupsResult(KafkaFuture<Map<String, ConsumerGroupListing>> future) {
        this.future = future;
    }

    /**
    * Return a future which yields a map of topic names to TopicListing objects.
    */
    public KafkaFuture<Map<String, ConsumerGroupListing>> namesToListings() {
        return future;
    }

    /**
     * Return a future which yields a collection of TopicListing objects.
     */
    public KafkaFuture<Collection<ConsumerGroupListing>> listings() {
        return future.thenApply(new KafkaFuture.Function<Map<String, ConsumerGroupListing>, Collection<ConsumerGroupListing>>() {
            @Override
            public Collection<ConsumerGroupListing> apply(Map<String, ConsumerGroupListing> namesToDescriptions) {
                return namesToDescriptions.values();
            }
        });
    }

    /**
    * Return a future which yields a collection of topic names.
    */
    public KafkaFuture<Set<String>> names() {
        return future.thenApply(new KafkaFuture.Function<Map<String, ConsumerGroupListing>, Set<String>>() {
            @Override
            public Set<String> apply(Map<String, ConsumerGroupListing> namesToListings) {
                return namesToListings.keySet();
            }
        });
    }
}
 
public class ConsumerGroupListing {
    private final String name;
}
 

...