You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently the OffsetFetch API supports fetching offsets for a single consumer group id. This is fine from a consumer client perspective but there are classes of problems (mainly monitoring and management) that require information about multiple consumer groups. Currently these applications must submit a separate request for each group to the group coordinator. This KIP streamlines this process so that a single request can be made to fetch offsets for multiple groups. This carries the following advantages:

  1. It reduces request overhead

  2. It simplifies client side code

Proposed Changes

This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.

Wire Protocol Changes: The OffsetFetch API will be extended to take an collection of consumer group ids (Collection<String>). Previous behaviour will be maintained as single group id requests will be translated to single element arrays inside the AdminClient. The corresponding response does not currently contain the group id of the offsets fetched and so must also be extended to include this information.

Response Handling Changes: This change introduces a new level to represent group level information in org.apache.kafka.common.requests.OffsetFetchResponse and the data structure within this will need to be modified as follows:

The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseDatanow encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.

Any errors will be communicated at a per group id level using the existing ERROR_CODE field in the response (see below). As any errors produced should already be covered by existing behaviour no new error codes will be required. However, a new scenario is introduced whereby errors can be experienced for a subset of the groups represented in the new call. This scenario is handled by returning separate futures for each group that can be individually examined (see the all() and values()) methods in the Public Interface section. Failures for individual groups will not be automatically retried, it is the responsibility of the caller to react in the appropriate way to the communicated errors. For instance, if the group coordinator changes for one group whilst the request is in progress this will be communicated back to the client and this must resubmit a request to the appropriate new coordinator.

AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups:

public ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(final Collection<String> groupIds,
                                                                   final ListConsumerGroupOffsetsOptions options) 
default ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(Collection<String> groupIds) {
        return listConsumerGroupOffsets(groupIds, new ListConsumerGroupOffsetsOptions());
    }                                                                   

Public Interfaces

We will bump the OffsetFetch API. The new schemas are provided below:


OffsetFetch Request => [group_ids] require_stable TAG_BUFFER 
  group_ids => name [topics] TAG_BUFFER
    topics => name [partition_indexes] TAG_BUFFER 
      name => COMPACT_STRING
      partition_indexes => INT32
    require_stable => BOOLEAN

OffsetFetch Response => throttle_time_ms [group_ids] TAG_BUFFER 
  throttle_time_ms => INT32
  group_ids => name [topics] TAG_BUFFER              // new
    topics => name [partitions] TAG_BUFFER 
      name => COMPACT_STRING
      partitions => partition_index committed_offset committed_leader_epoch metadata error_code TAG_BUFFER 
        partition_index => INT32
        committed_offset => INT64
        committed_leader_epoch => INT32
        metadata => COMPACT_NULLABLE_STRING
        error_code => INT16
    error_code => INT16

The new level of groups information in the response is reflected in OffsetFetchResponseData

public class OffsetFetchResponseData implements ApiMessage {
    int throttleTimeMs;
    List<OffsetFetchResponseGroup> groups; //replaces from List<OffsetFetchResponseTopic> topics; 
    short errorCode;
    private List<RawTaggedField> _unknownTaggedFields;
...
}
    
public static class OffsetFetchResponseGroup implements Message {
        String name;
        List<OffsetFetchResponseTopic> topics;
}

The following will be added to org.apache.kafka.clients.admin.Admin:


    /**
     * List the consumer group offsets for a given coollection of groups available in the cluster.
     *
     * @param groupIds A collection of groups for which offsets will be fetched
     * @param options The options to use when listing the consumer group offsets.
     * @return The ListConsumerGroupsOffsetsResult
     */
     public ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(final Collection<String> groupIds,
                                                                   final ListConsumerGroupOffsetsOptions options) 
    /**
     * List the consumer group offsets for a given coollection of groups available in the cluster with the default options.
     * <p>
     * This is a convenience method for {@link #listConsumerGroupsOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
     *
     * @return The ListConsumerGroupsOffsetsResult.
     */
      default ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(Collection<String> groupIds) {
        return listConsumerGroupsOffsets(groupIds, new ListConsumerGroupOffsetsOptions());
      }  

These are in addition to the existing group offset fetch methods that will stay:

default ListConsumerGroupsResult listConsumerGroups() {
    return listConsumerGroups(new ListConsumerGroupsOptions());
}
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
These calls return a new result container ListConsumerGroupsOffsetsResult which yields the following (as per the usual pattern):

/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {

    private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    protected ListConsumerGroupsOffsetsResult(Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        this.futures = futures;
    }

    public Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {
        return futures;
    }

    /**
     * Return a future which succeeds only if all the offset fetches succeed.
     */
    public KafkaFuture<Map<String,Map<TopicPartition, OffsetAndMetadata>>> all() {

}

Compatibility, Deprecation, and Migration Plan

As the protocol changes encompass the previous schemas and client changes are in addition to existing APIs there are no compatibility concerns with this change.

Should an Admin client with this new functionality be used against an old broker that cannot handle these requests then the methods will throw UnsupportedVersionException as per the usual pattern.

Rejected Alternatives

  • Replace the existing listConsumerGroupOffsets methods with new methods with the multiple group signature. This simplifies the AdminClient interface but it’s assumed that the primary use case for fetching offsets will still be for a single group and so we should favour this in the interfaces.

  • No labels