Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
    /**
     * List the consumer group offsets foravailable ain giventhe collectioncluster offor groupsthe availablespecified inconsumer the clustergroups.
     *
     * @param groupIds A collection of groups for which offsets will be fetchedgroupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
     *
     * @param options The options to use when listing the consumer group offsets.
     * @return The ListConsumerGroupsOffsetsResultListConsumerGroupOffsetsResult
     */
     ListConsumerGroupOffsetsResult list listConsumerGroupOffsets(Map<String, List<TopicPartition>>ListConsumerGroupOffsetsSpec> groupToTopicPartitionsgroupSpecs, ListConsumerGroupOffsetOptionsListConsumerGroupOffsetsOptions options); 

    /**
     * List the consumer group offsets foravailable ain giventhe collectioncluster offor groupsthe available in the clusterspecified groups with the default options.
     * <p>
     * This is a convenience method for
     * {@link #listConsumerGroupsOffsets#listConsumerGroupOffsets(StringMap, ListConsumerGroupOffsetsOptions)} with default options.
     *
     * @return@param groupSpecs TheMap ListConsumerGroupsOffsetsResult.
of consumer group ids to */
	default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, List<TopicPartition>> groupToTopicPartitions) {
		return listConsumerGroupOffsets(groupToTopicPartitions, new ListConsumerGroupOffsetOptions());
	} 

ListConsumerGroupsResult will also change to include a mapping of group id to the map of TopicPartition and OffsetAndMetadata

Code Block
/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {

    private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    protected ListConsumerGroupsOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        this.futures = futuresa spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListConsumerGroupOffsetsResult.
     */
    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
        return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
    }

    public Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {
        return futures;
    }

    /**
     * Return a future which succeeds only if all the offset fetches succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {

}

Proposed Changes

This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.

Wire Protocol Changes: The OffsetFetch API will be extended to take an collection of consumer group ids (Collection<String>). Previous behaviour will be maintained as single group id requests will be translated to single element arrays inside the AdminClient. The corresponding response does not currently contain the group id of the offsets fetched and so must also be extended to include this information.

Response Handling Changes: This change introduces a new level to represent group level information in org.apache.kafka.common.requests.OffsetFetchResponse and the data structure within this will need to be modified as follows:

The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseData now encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.

Any errors will be communicated at a per group id level using the existing ERROR_CODE field in the response (see above). As any errors produced should already be covered by existing behaviour no new error codes will be required. However, a new scenario is introduced whereby errors can be experienced for a subset of the groups represented in the new call. This scenario is handled by returning separate futures for each group that can be individually examined (see the all() and values()) methods in the Public Interface section. Failures for individual groups will not be automatically retried, it is the responsibility of the caller to react in the appropriate way to the communicated errors. For instance, if the group coordinator changes for one group whilst the request is in progress this will be communicated back to the client and this must resubmit a request to the appropriate new coordinator.

AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups:

...


  


New class ListConsumerGroupOffsetsSpec  will be added to specify the partitions for each group. We can add additional per-group filters to this spec in future if required.

Code Block
languagejava
titleListConsumerGroupOffsetsSpec
/**
 * Specification of consumer group offsets to list using {@link Admin#listConsumerGroupOffsets(java.util.Map)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsSpec {

    private Collection<TopicPartition> topicPartitions;

    /**
     * Set the topic partitions whose offsets are to be listed for a consumer group.
     * {@code null} includes all topic partitions.
     *
     * @param topicPartitions List of topic partitions to include
     * @return This ListConsumerGroupOffsetSpec
     */
    public ListConsumerGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
        this.topicPartitions = topicPartitions;
        return this;
    }

    /**
     * Returns the topic partitions whose offsets are to be listed for a consumer group.
     * {@code null} indicates that offsets of all partitions of the group are to be listed.
     */
    public Collection<TopicPartition> topicPartitions() {
        return topicPartitions;
    }
}


ListConsumerGroupOffsetsResult will also change to include a mapping of group id to the map of TopicPartition and OffsetAndMetadata

Code Block
/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
 * {@link Admin#listConsumerGroupOffsets(String)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ListConsumerGroupOffsetsResult {

    final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
    }

    /**
     * Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
     * If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
     */
    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {}

    /**
     * Return a map of group ids to their corresponding futures that yield a map of topic partitions to
     * OffsetAndMetadata objects. If the group doesn't have a committed offset for a specific
     * partition, the corresponding value in the returned map for that group id will be null.
     */
    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {}

    /**
     * Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects,
     * if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {}


Proposed Changes

This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.

Wire Protocol Changes: The OffsetFetch API will be extended to take an collection of consumer group ids (Collection<String>). Previous behaviour will be maintained as single group id requests will be translated to single element arrays inside the AdminClient. The corresponding response does not currently contain the group id of the offsets fetched and so must also be extended to include this information.

Response Handling Changes: This change introduces a new level to represent group level information in org.apache.kafka.common.requests.OffsetFetchResponse and the data structure within this will need to be modified as follows:

The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseData now encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.

Any errors will be communicated at a per group id level using the existing ERROR_CODE field in the response (see above). As any errors produced should already be covered by existing behaviour no new error codes will be required. However, a new scenario is introduced whereby errors can be experienced for a subset of the groups represented in the new call. This scenario is handled by returning separate futures for each group that can be individually examined (see the all() and values()) methods in the Public Interface section. Failures for individual groups will not be automatically retried, it is the responsibility of the caller to react in the appropriate way to the communicated errors. For instance, if the group coordinator changes for one group whilst the request is in progress this will be communicated back to the client and this must resubmit a request to the appropriate new coordinator.

AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups as described under Public Interfaces.

Related/Future Work

The changes contained in this KIP refer specifically to offset listing requests. However, in order to perform these requests clients must first find the correct group coordinator. Optimisations for this process are contained in KIP-699 (KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time). These 2 KIPS are not interdependent.

...

As this will introduce a new protocol for the FetchOffset API, we need to ensure we have backwards compatibility with old clients sending requests to the new brokers. We are keeping the old API with the single group id the same, but simply making it call the new batched group ID API with a singleton list. This should ensure that we will have backwards compatibility with old clients. Newer clients connecting to newer brokers will use the new protocol regardless of the Admin API used. Newer clients connecting to older brokers will use the older protocol version, falling back to unbatched mode with multiple requests if the new batched API is used.

Rejected Alternatives

  • Replace the existing listConsumerGroupOffsets methods with new methods with the multiple group signature. This simplifies the AdminClient interface but it’s assumed that the primary use case for fetching offsets will still be for a single group and so we should favour this in the interfaces.