Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
    /**
     * List the consumer group offsets for a given collection of groups available in the cluster.
     *
     * @param groupIds A collection of groups for which offsets will be fetched
     * @param options The options to use when listing the consumer group offsets.
     * @return The ListConsumerGroupsOffsetsResult
     */
     publicListConsumerGroupOffsetsResult ListConsumerGroupsOffsetsResultlist listConsumerGroupsOffsetslistConsumerGroupOffsets(finalMap<String, Collection<String>List<TopicPartition>> groupIdsgroupToTopicPartitions,
 ListConsumerGroupOffsetOptions options); 
    /**
     * List the consumer group offsets for a given collection of groups available in the cluster with the default options.
     * <p>
     * This is a convenience method for {@link #listConsumerGroupsOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
     *
     * @return The finalListConsumerGroupsOffsetsResult.
 ListConsumerGroupOffsetsOptions options) 
    */**
	default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, List<TopicPartition>>  * List the consumer group offsets for a given collection of groups available in the cluster with the default options.
    groupToTopicPartitions) {
		return listConsumerGroupOffsets(groupToTopicPartitions, new ListConsumerGroupOffsetOptions());
	} 

ListConsumerGroupsResult will also change to include a mapping of group id to the map of TopicPartition and OffsetAndMetadata

Code Block
/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
 * <p>
 * The API of *this Thisclass is a convenience method forevolving, see {@link #listConsumerGroupsOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
     *
     * @return The ListConsumerGroupsOffsetsResult.
     */
      default ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(Collection<String> groupIds) {
        return listConsumerGroupsOffsets(groupIds, new ListConsumerGroupOffsetsOptions());
      }  

ListConsumerGroupsResult will also change to include a mapping of group id to the map of TopicPartition and OffsetAndMetadata

Code Block
/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {

    private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    protected ListConsumerGroupsOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {
        return futures;
    }

    /**
     * Return a future which succeeds only if all the offset fetches succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {

}

ListConsumerGroupsOffsetsOptions will also change to include a mapping of group id to the list of TopicPartitions, so we give flexibility to get offsets for specific consumer groups with specific partitions.

Code Block
languagejava
/**
 * Options for {@link Admin#listConsumerGroupOffsets(String)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {

    // maps a consumer group to the list of partitions we want to
		// get offsets for
		private Map<String, List<TopicPartition>> topicPartitions = null;

    /**
     * Set the topic partitions to list as part of the result.
     * {@code null} includes all topic partitions for each group.
     *
     * @param topicPartitions Map of group to List of topic partitions to include
     * @return This ListGroupOffsetsOptions
     */
    public ListConsumerGroupOffsetsOptions topicPartitions(Map<String, List<TopicPartition>> topicPartitions) {
        this.topicPartitions = topicPartitions;Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {

    private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    protected ListConsumerGroupsOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {
        return thisfutures;
    }

    /**
     * ReturnsReturn a listfuture ofwhich topicsucceeds partitionsonly toif addall asthe partoffset offetches the resultsucceed.
     */
    public KafkaFuture<Map<String, Map<StringMap<TopicPartition, List<TopicPartition>>OffsetAndMetadata>>> topicPartitionsall() {
        return topicPartitions;
    }
}


Proposed Changes

This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.

...