Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will bump the OffsetFetch API to a new version. The new schemas are provided below. These introduce a group_id level return, note that previous top level error codes will now be return at a per-group level:

Code Block
OffsetFetch Request => [group_ids] require_stable TAG_BUFFER 
  group_ids => name [topics] TAG_BUFFER
    topics => name [partition_indexes] TAG_BUFFER 
      name => COMPACT_STRING
      partition_indexes => INT32
    require_stable => BOOLEAN

OffsetFetch Response => throttle_time_ms [group_ids] TAG_BUFFER 
  throttle_time_ms => INT32
  group_ids => name [topics] TAG_BUFFER              // new
    topics => name [partitions] TAG_BUFFER 
      name => COMPACT_STRING
      partitions => partition_index committed_offset committed_leader_epoch metadata error_code TAG_BUFFER 
        partition_index => INT32
        committed_offset => INT64
        committed_leader_epoch => INT32
        metadata => COMPACT_NULLABLE_STRING
        error_code => INT16
    error_code => INT16								// moved

The new level of groups information in the response is reflected in OffsetFetchResponseData

Code Block
public class OffsetFetchResponseData implements ApiMessage {
    int throttleTimeMs;
    List<OffsetFetchResponseGroup> groups; //replaces from List<OffsetFetchResponseTopic> topics; 
    short errorCode;
    private List<RawTaggedField> _unknownTaggedFields;
...
}
    
public static class OffsetFetchResponseGroup implements Message {
        String name;
        List<OffsetFetchResponseTopic> topics;
}

The following will be added to org.apache.kafka.clients.admin.Admin:

Below is the new JSON for the OffsetFetchRequest. Here we are adding a new OffsetFetchRequestGroup type which has a string for the groupId as well as a list of OffsetFetchRequestTopics

Code Block
{
  "apiKey": 9,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "OffsetFetchRequest",
  // In version 0, the request read offsets from ZK.
  //
  // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
  //
  // Starting in version 2, the request can contain a null topics array to indicate that offsets
  // for all topics should be fetched. It also returns a top level error code
  // for group or coordinator level errors.
  //
  // Version 3, 4, and 5 are the same as version 2.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is adding the require stable flag.
  //
  // Version 8 is adding support for fetching offsets for multiple groups
  "validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
		{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": "8+", // new
		 "about": "Each group we would like to fetch offsets for", "fields": [
			{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
        "about": "The group ID."}, // new
			{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+",
      "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
	      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
	        "about": "The topic name."},
	      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
	        "about": "The partition indexes we would like to fetch offsets for." }
			 ]}
		]},
    { "name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
     "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partition."}
  ]
}


Below is the JSON for the new OffsetFetchResponse. We are adding a new type called OffsetFetchResponseGroup, which adds a new string for the GroupId, as well as a list of OffsetFetchResponseTopics. In addition, we have moved the error code as a field within OffsetFetchResponseGroup so that we can give back group level error codes in the response.

Code Block
{
  "apiKey": 9,
  "type": "response",
  "name": "OffsetFetchResponse",
  // Version 1 is the same as version 0.
  //
  // Version 2 adds a top-level error code.
  //
  // Version 3 adds the throttle time.
  //
  // Starting in version 4, on quota violation, brokers send out responses before throttling.
  //
  // Version 5 adds the leader epoch to the committed offset.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 adds pending offset commit as new error response on partition level.
  //
  // Version 8 is adding support for fetching offsets for multiple groups  
"validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    {"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": "8+", // new
		 "about": "The responses per group id.", "fields": [
			{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", // new
          "about": "The group ID." }, 
			{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", 
      "about": "The responses per topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+",
          "about": "The responses per partition", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
          { "name": "CommittedOffset", "type": "int64", "versions": "0+",
            "about": "The committed message offset." },
          { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1",
            "ignorable": true, "about": "The leader epoch." },
          { "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+",
            "about": "The partition metadata." },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The error code, or 0 if there was no error." }
          ]}
      ]},
  		{ "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0", "ignorable": true, // moved inside OffsetFetchResponseGroup
        "about": "The group-level error code, or 0 if there was no error." }
      ]}
  ]
}

The new level of groups information in the request is reflected in OffsetFetchRequestData

Code Block
public class OffsetFetchRequestData implements ApiMessage {
    List<OffsetFetchRequestGroup> groups;
    boolean requireStable;
    private List<RawTaggedField> _unknownTaggedFields;
...
}
    
public static class OffsetFetchRequestGroup implements Message {
        String groupId;
        List<OffsetFetchResponseTopic> topics;
}


The new level of groups information in the response is reflected in OffsetFetchResponseData


Code Block
public class OffsetFetchResponseData implements ApiMessage {
    int throttleTimeMs;
    List<OffsetFetchResponseGroup> groups; //replaces from List<OffsetFetchResponseTopic> topics; 
    short errorCode;
    private List<RawTaggedField> _unknownTaggedFields;
...
}
    
public static class OffsetFetchResponseGroup implements Message {
        String groupId;
        List<OffsetFetchRequestTopic> topics;
}

The following will be added to Admin.java:

Code Block

    /**
     * List the consumer group offsets for a given collection of groups available in the cluster.
     *
     * @param groupIds A collection of groups for which offsets will be fetched
     * @param options The options to use when listing the consumer group offsets.
     * @return The ListConsumerGroupsOffsetsResult
     */
     public ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(final Collection<String> groupIds,
                                                                   final ListConsumerGroupOffsetsOptions options) 
    /**
     * List the consumer group offsets for a given collection of groups available in the cluster with the default options.
     * <p>
     * This is a convenience method for {@link #listConsumerGroupsOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
     *
     * @return The ListConsumerGroupsOffsetsResult.
     */
      default ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(Collection<String> groupIds) {
        return listConsumerGroupsOffsets(groupIds, new ListConsumerGroupOffsetsOptions());
      }  

ListConsumerGroupsResult will also change to include a mapping of group id to the map of TopicPartition and OffsetAndMetadata

Code Block
/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {

    private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    protected ListConsumerGroupsOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {
        return futures;
    }

Code Block
    /**
     * List the consumer group offsets for a given coollection of groups available in the cluster.
     *
     * @param groupIds A collection of groups for which offsets will be fetched
     * @param options The options to use when listing the consumer group offsets.
     * @return The ListConsumerGroupsOffsetsResult
     */
     public ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(final Collection<String> groupIds,
                                                                   final ListConsumerGroupOffsetsOptions options) 
    /**
     * ListReturn a future which succeeds only if all the consumeroffset groupfetches offsetssucceed.
 for a given coollection of*/
 groups available in thepublic clusterKafkaFuture<Map<String, withMap<TopicPartition, theOffsetAndMetadata>>> default options.
     * <p>
     * This is a convenience method all() {

}

ListConsumerGroupsOffsetsOptions will also change to include a mapping of group id to the list of TopicPartitions, so we give flexibility to get offsets for specific consumer groups with specific partitions.

Code Block
languagejava
/**
 * Options for {@link #listConsumerGroupsOffsetsAdmin#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)}.
 with default options.* <p>
 * The API of *
this class is evolving, see *{@link @returnAdmin} Thefor ListConsumerGroupsOffsetsResultdetails.
     */
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsOptions extends   default ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(Collection<String> groupIds) AbstractOptions<ListConsumerGroupOffsetsOptions> {

    // maps a consumer return listConsumerGroupsOffsets(groupIds, new ListConsumerGroupOffsetsOptions());
      }  

These are in addition to the existing group offset fetch methods that will stay:

Code Block
default ListConsumerGroupsResult listConsumerGroups() {
    return listConsumerGroups(new ListConsumerGroupsOptions());
}
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
These calls return a new result container ListConsumerGroupsOffsetsResult which yields the following (as per the usual pattern):

/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {

    private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    protected ListConsumerGroupsOffsetsResult(Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        this.futures = futures;
    }

    public Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {group to the list of partitions we want to
		// get offsets for
		private Map<String, List<TopicPartition>> topicPartitions = null;

    /**
     * Set the topic partitions to list as part of the result.
     * {@code null} includes all topic partitions for each group.
     *
     * @param topicPartitions Map of group to List of topic partitions to include
     * @return This ListGroupOffsetsOptions
     */
    public ListConsumerGroupOffsetsOptions topicPartitions(Map<String, List<TopicPartition>> topicPartitions) {
        this.topicPartitions = topicPartitions;
        return futuresthis;
    }

    /**
     * ReturnReturns a list futureof whichtopic succeedspartitions onlyto ifadd allas thepart offsetof fetchesthe succeedresult.
     */
    public KafkaFuture<Map<String,Map<TopicPartitionMap<String, OffsetAndMetadata>>>List<TopicPartition>> alltopicPartitions() {
 {
        return topicPartitions;
    }
}

Proposed Changes

This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.

...

The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseDatanow  now encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.

...

Compatibility, Deprecation, and Migration Plan

As the protocol changes encompass the previous schemas and client changes are in addition to existing APIs there are no compatibility concerns with this change.Should an Admin client with this new functionality be used against an old broker that cannot handle these requests then the methods will revert to this previous behaviour where each group id is treated as a separate call to the old APIthis will introduce a new protocol for the FetchOffset API, we need to ensure we have backwards compatibility with old clients sending requests to the new brokers. We are keeping the old API with the single group id the same, but simply making it call the new batched group ID API with a singleton list. This should ensure that we will have backwards compatibility with old clients.

Rejected Alternatives

  • Replace the existing listConsumerGroupOffsets methods with new methods with the multiple group signature. This simplifies the AdminClient interface but it’s assumed that the primary use case for fetching offsets will still be for a single group and so we should favour this in the interfaces.