You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Current »

Status

Current state "Accepted"

Discussion thread: here 

JIRA: Unable to render Jira issues macro, execution error. , Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently the OffsetFetch API supports fetching offsets for a single consumer group id. This is fine from a consumer client perspective but there are classes of problems (mainly monitoring and management) that require information about multiple consumer groups. Currently these applications must submit a separate request for each group to the group coordinator. This KIP streamlines this process so that a single request can be made to fetch offsets for multiple groups. This carries the following advantages:

  1. It reduces request overhead

  2. It simplifies client side code

Public Interfaces

We will bump the OffsetFetch API to a new version. The new schemas are provided below. These introduce a group_id level return, note that previous top level error codes will now be return at a per-group level:

Below is the new JSON for the OffsetFetchRequest. Here we are adding a new OffsetFetchRequestGroup type which has a string for the groupId as well as a list of OffsetFetchRequestTopics

{
  "apiKey": 9,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "OffsetFetchRequest",
  // In version 0, the request read offsets from ZK.
  //
  // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
  //
  // Starting in version 2, the request can contain a null topics array to indicate that offsets
  // for all topics should be fetched. It also returns a top level error code
  // for group or coordinator level errors.
  //
  // Version 3, 4, and 5 are the same as version 2.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is adding the require stable flag.
  //
  // Version 8 is adding support for fetching offsets for multiple groups
  "validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
		{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": "8+", // new
		 "about": "Each group we would like to fetch offsets for", "fields": [
			{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
        "about": "The group ID."}, // new
			{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+",
      "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
	      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
	        "about": "The topic name."},
	      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
	        "about": "The partition indexes we would like to fetch offsets for." }
			 ]}
		]},
    { "name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
     "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partition."}
  ]
}


Below is the JSON for the new OffsetFetchResponse. We are adding a new type called OffsetFetchResponseGroup, which adds a new string for the GroupId, as well as a list of OffsetFetchResponseTopics. In addition, we have moved the error code as a field within OffsetFetchResponseGroup so that we can give back group level error codes in the response.

{
  "apiKey": 9,
  "type": "response",
  "name": "OffsetFetchResponse",
  // Version 1 is the same as version 0.
  //
  // Version 2 adds a top-level error code.
  //
  // Version 3 adds the throttle time.
  //
  // Starting in version 4, on quota violation, brokers send out responses before throttling.
  //
  // Version 5 adds the leader epoch to the committed offset.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 adds pending offset commit as new error response on partition level.
  //
  // Version 8 is adding support for fetching offsets for multiple groups  
"validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    {"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": "8+", // new
		 "about": "The responses per group id.", "fields": [
			{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", // new
          "about": "The group ID." }, 
			{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", 
      "about": "The responses per topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+",
          "about": "The responses per partition", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
          { "name": "CommittedOffset", "type": "int64", "versions": "0+",
            "about": "The committed message offset." },
          { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1",
            "ignorable": true, "about": "The leader epoch." },
          { "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+",
            "about": "The partition metadata." },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The error code, or 0 if there was no error." }
          ]}
      ]},
  		{ "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0", "ignorable": true, // moved inside OffsetFetchResponseGroup
        "about": "The group-level error code, or 0 if there was no error." }
      ]}
  ]
}

The new level of groups information in the request is reflected in OffsetFetchRequestData

public class OffsetFetchRequestData implements ApiMessage {
    List<OffsetFetchRequestGroup> groups;
    boolean requireStable;
    private List<RawTaggedField> _unknownTaggedFields;
...
}
    
public static class OffsetFetchRequestGroup implements Message {
        String groupId;
        List<OffsetFetchResponseTopic> topics;
}


The new level of groups information in the response is reflected in OffsetFetchResponseData

public class OffsetFetchResponseData implements ApiMessage {
    int throttleTimeMs;
    List<OffsetFetchResponseGroup> groups; //replaces from List<OffsetFetchResponseTopic> topics; 
    short errorCode;
    private List<RawTaggedField> _unknownTaggedFields;
...
}
    
public static class OffsetFetchResponseGroup implements Message {
        String groupId;
        List<OffsetFetchRequestTopic> topics;
}

The following will be added to Admin.java:

    /**
     * List the consumer group offsets for a given collection of groups available in the cluster.
     *
     * @param groupIds A collection of groups for which offsets will be fetched
     * @param options The options to use when listing the consumer group offsets.
     * @return The ListConsumerGroupsOffsetsResult
     */
     ListConsumerGroupOffsetsResult list listConsumerGroupOffsets(Map<String, List<TopicPartition>> groupToTopicPartitions, ListConsumerGroupOffsetOptions options); 
    /**
     * List the consumer group offsets for a given collection of groups available in the cluster with the default options.
     * <p>
     * This is a convenience method for {@link #listConsumerGroupsOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
     *
     * @return The ListConsumerGroupsOffsetsResult.
     */
	default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, List<TopicPartition>> groupToTopicPartitions) {
		return listConsumerGroupOffsets(groupToTopicPartitions, new ListConsumerGroupOffsetOptions());
	} 

ListConsumerGroupsResult will also change to include a mapping of group id to the map of TopicPartition and OffsetAndMetadata

/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {

    private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    protected ListConsumerGroupsOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {
        return futures;
    }

    /**
     * Return a future which succeeds only if all the offset fetches succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {

}

Proposed Changes

This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.

Wire Protocol Changes: The OffsetFetch API will be extended to take an collection of consumer group ids (Collection<String>). Previous behaviour will be maintained as single group id requests will be translated to single element arrays inside the AdminClient. The corresponding response does not currently contain the group id of the offsets fetched and so must also be extended to include this information.

Response Handling Changes: This change introduces a new level to represent group level information in org.apache.kafka.common.requests.OffsetFetchResponse and the data structure within this will need to be modified as follows:

The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseData now encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.

Any errors will be communicated at a per group id level using the existing ERROR_CODE field in the response (see above). As any errors produced should already be covered by existing behaviour no new error codes will be required. However, a new scenario is introduced whereby errors can be experienced for a subset of the groups represented in the new call. This scenario is handled by returning separate futures for each group that can be individually examined (see the all() and values()) methods in the Public Interface section. Failures for individual groups will not be automatically retried, it is the responsibility of the caller to react in the appropriate way to the communicated errors. For instance, if the group coordinator changes for one group whilst the request is in progress this will be communicated back to the client and this must resubmit a request to the appropriate new coordinator.

AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups:

public ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(final Collection<String> groupIds,
                                                                   final ListConsumerGroupOffsetsOptions options) 
default ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(Collection<String> groupIds) {
        return listConsumerGroupOffsets(groupIds, new ListConsumerGroupOffsetsOptions());
    }                                                                   

Related/Future Work

The changes contained in this KIP refer specifically to offset listing requests. However, in order to perform these requests clients must first find the correct group coordinator. Optimisations for this process are contained in KIP-699 (KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time). These 2 KIPS are not interdependent.

Compatibility, Deprecation, and Migration Plan

As this will introduce a new protocol for the FetchOffset API, we need to ensure we have backwards compatibility with old clients sending requests to the new brokers. We are keeping the old API with the single group id the same, but simply making it call the new batched group ID API with a singleton list. This should ensure that we will have backwards compatibility with old clients.

Rejected Alternatives

  • Replace the existing listConsumerGroupOffsets methods with new methods with the multiple group signature. This simplifies the AdminClient interface but it’s assumed that the primary use case for fetching offsets will still be for a single group and so we should favour this in the interfaces.

  • No labels