Table of Contents |
---|
Status
Current state: "Under DiscussionAccepted"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
It reduces request overhead
It simplifies client side code
Proposed Changes
This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.
Wire Protocol Changes: The OffsetFetch API will be extended to take an collection of consumer group ids (Collection<String>). Previous behaviour will be maintained as single group id requests will be translated to single element arrays inside the AdminClient. The corresponding response does not currently contain the group id of the offsets fetched and so must also be extended to include this information.
Response Handling Changes: This change introduces a new level to represent group level information in org.apache.kafka.common.requests.OffsetFetchResponse and the data structure within this will need to be modified as follows:
The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseDatanow encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.
Any errors will be communicated at a per group id level using the existing ERROR_CODE field in the response (see below). As any errors produced should already be covered by existing behaviour no new error codes will be required. However, a new scenario is introduced whereby errors can be experienced for a subset of the groups represented in the new call. This scenario is handled by returning separate futures for each group that can be individually examined (see the all() and values()) methods in the Public Interface section. Failures for individual groups will not be automatically retried, it is the responsibility of the caller to react in the appropriate way to the communicated errors. For instance, if the group coordinator changes for one group whilst the request is in progress this will be communicated back to the client and this must resubmit a request to the appropriate new coordinator.
AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups:
Public Interfaces
We will bump the OffsetFetch API to a new version. The new schemas are provided below. These introduce a group_id level return, note that previous top level error codes will now be return at a per-group level:
Below is the new JSON for the OffsetFetchRequest. Here we are adding a new OffsetFetchRequestGroup type which has a string for the groupId as well as a list of OffsetFetchRequestTopics
Code Block |
---|
{
"apiKey": 9,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "OffsetFetchRequest",
// In version 0, the request read offsets from ZK.
//
// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
//
// Starting in version 2, the request can contain a null topics array to indicate that offsets
// for all topics should be fetched. It also returns a top level error code
// for group or coordinator level errors.
//
// Version 3, 4, and 5 are the same as version 2.
//
// Version 6 is the first flexible version.
//
// Version 7 is adding the require stable flag.
//
// Version 8 is adding support for fetching offsets for multiple groups
"validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": "8+", // new
"about": "Each group we would like to fetch offsets for", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", |
Code Block |
public ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(final Collection<String> groupIds, final ListConsumerGroupOffsetsOptions options) default ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(Collection<String> groupIds) { return listConsumerGroupOffsets(groupIds, new ListConsumerGroupOffsetsOptions()); } "about": "The group ID."}, // new { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name."}, { "name": "PartitionIndexes", "type": "[]int32", |
Public Interfaces
We will bump the OffsetFetch API. The new schemas are provided below:
Code Block |
---|
OffsetFetch Request => [group_ids] require_stable TAG_BUFFER group_ids => name [topics] TAG_BUFFER topics => name [partition_indexes] TAG_BUFFER "versions": "0+", "about": "The partition indexes we would like to fetch offsets for." } ]} ]}, { "name => COMPACT_STRING partition_indexes => INT32 require_stable => BOOLEAN OffsetFetch Response => throttle_time_ms [group_ids] TAG_BUFFER throttle_time_ms => INT32 group_ids => name [topics] TAG_BUFFER // new topics => name [partitions] TAG_BUFFER name => COMPACT_STRING partitions => partition_index committed_offset committed_leader_epoch metadata error_code TAG_BUFFER partition_index => INT32 committed_offset => INT64 committed_leader_epoch => INT32 metadata => COMPACT_NULLABLE_STRING error_code => INT16 error_code => INT16 |
The new level of groups information in the response is reflected in OffsetFetchResponseData
Code Block |
---|
public class OffsetFetchResponseData implements ApiMessage {
int throttleTimeMs;
List<OffsetFetchResponseGroup> groups; //replaces from List<OffsetFetchResponseTopic> topics;
short errorCode;
private List<RawTaggedField> _unknownTaggedFields;
...
}
public static class OffsetFetchResponseGroup implements Message {
String name;
List<OffsetFetchResponseTopic> topics;
} |
The following will be added to org.apache.kafka.clients.admin.Admin:
": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
"about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partition."}
]
} |
Below is the JSON for the new OffsetFetchResponse. We are adding a new type called OffsetFetchResponseGroup, which adds a new string for the GroupId, as well as a list of OffsetFetchResponseTopics. In addition, we have moved the error code as a field within OffsetFetchResponseGroup so that we can give back group level error codes in the response.
Code Block |
---|
{
"apiKey": 9,
"type": "response",
"name": "OffsetFetchResponse",
// Version 1 is the same as version 0.
//
// Version 2 adds a top-level error code.
//
// Version 3 adds the throttle time.
//
// Starting in version 4, on quota violation, brokers send out responses before throttling.
//
// Version 5 adds the leader epoch to the committed offset.
//
// Version 6 is the first flexible version.
//
// Version 7 adds pending offset commit as new error response on partition level.
//
// Version 8 is adding support for fetching offsets for multiple groups
"validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": "8+", // new
"about": "The responses per group id.", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", // new
"about": "The group ID." },
{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+",
"about": "The responses per topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+",
"about": "The responses per partition", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "CommittedOffset", "type": "int64", "versions": "0+",
"about": "The committed message offset." },
{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1",
"ignorable": true, "about": "The leader epoch." },
{ "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The partition metadata." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }
]}
]},
{ "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0", "ignorable": true, // moved inside OffsetFetchResponseGroup
"about": "The group-level error code, or 0 if there was no error." }
]}
]
} |
The new level of groups information in the request is reflected in OffsetFetchRequestData
Code Block |
---|
public class OffsetFetchRequestData implements ApiMessage {
List<OffsetFetchRequestGroup> groups;
boolean requireStable;
private List<RawTaggedField> _unknownTaggedFields;
...
}
public static class OffsetFetchRequestGroup implements Message {
String groupId;
List<OffsetFetchResponseTopic> topics;
} |
The new level of groups information in the response is reflected in OffsetFetchResponseData
Code Block |
---|
public class OffsetFetchResponseData implements ApiMessage {
int throttleTimeMs;
List<OffsetFetchResponseGroup> groups; //replaces from List<OffsetFetchResponseTopic> topics;
short errorCode;
private List<RawTaggedField> _unknownTaggedFields;
...
}
public static class OffsetFetchResponseGroup implements Message {
String groupId;
List<OffsetFetchRequestTopic> topics;
} |
The following will be added to Admin.java:
Code Block |
---|
/**
* List the consumer group offsets available in the cluster for the specified consumer groups.
*
* @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
*
* @param options The options to use when listing the consumer group offsets.
* @return The ListConsumerGroupOffsetsResult
*/
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options);
/**
* List the consumer group offsets available in the cluster for the specified groups with the default options.
* <p>
* This is a convenience method for
* {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} with default options.
*
* @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @return The ListConsumerGroupOffsetsResult.
*/
default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
}
|
New class ListConsumerGroupOffsetsSpec
will be added to specify the partitions for each group. We can add additional per-group filters to this spec in future if required.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Specification of consumer group offsets to list using {@link Admin#listConsumerGroupOffsets(java.util.Map)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsSpec {
private Collection<TopicPartition> topicPartitions;
/**
* Set the topic partitions whose offsets are to be listed for a consumer group.
* {@code null} includes all topic partitions.
*
* @param topicPartitions List of topic partitions to include
* @return This ListConsumerGroupOffsetSpec
*/
public ListConsumerGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
}
/**
* Returns the topic partitions whose offsets are to be listed for a consumer group.
* {@code null} indicates that offsets of all partitions of the group are to be listed.
*/
public Collection<TopicPartition> topicPartitions() {
return topicPartitions;
}
} |
ListConsumerGroupOffsetsResult will also change to include a mapping of group id to the map of TopicPartition and OffsetAndMetadata
Code Block |
---|
/**
* The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
* {@link Admin#listConsumerGroupOffsets(String)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ListConsumerGroupOffsetsResult {
final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;
ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
}
/**
* Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
* If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {}
|
Code Block |
/** * Return Lista themap consumerof group offsetsids forto atheir givencorresponding coollectionfutures ofthat groupsyield availablea inmap theof cluster. topic partitions *to * @param groupIds A collection of groups for which offsets will be fetchedOffsetAndMetadata objects. If the group doesn't have a committed offset for a specific * @parampartition, optionsthe Thecorresponding optionsvalue toin usethe whenreturned listingmap thefor consumerthat group offsets. * @return The ListConsumerGroupsOffsetsResultid will be null. */ public publicKafkaFuture<Map<TopicPartition, ListConsumerGroupsOffsetsResultOffsetAndMetadata>> listConsumerGroupsOffsetspartitionsToOffsetAndMetadata(finalString Collection<String> groupIds,groupId) {} /** * Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects, * if requests for all the groups succeed. */ public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> final ListConsumerGroupOffsetsOptions options) /** * List the consumer group offsets for a given coollection of groups available in the cluster with the default options. * <p> * This is a convenience method for {@link #listConsumerGroupsOffsets(String, ListConsumerGroupOffsetsOptions)} with default options. * * @return The ListConsumerGroupsOffsetsResult. */ default ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(Collection<String> groupIds) { return listConsumerGroupsOffsets(groupIds, new ListConsumerGroupOffsetsOptions()); } |
These are in addition to the existing group offset fetch methods that will stay:
Code Block |
---|
default ListConsumerGroupsResult listConsumerGroups() {
return listConsumerGroups(new ListConsumerGroupsOptions());
}
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
These calls return a new result container ListConsumerGroupsOffsetsResult which yields the following (as per the usual pattern):
/**
* The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {
private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;
protected ListConsumerGroupsOffsetsResult(Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
this.futures = futures;
}
public Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {
return futures;
}
/**
* Return a future which succeeds only if all the offset fetches succeed.
*/
public KafkaFuture<Map<String,Map<TopicPartition, OffsetAndMetadata>>> all() {
} |
Compatibility, Deprecation, and Migration Plan
As the protocol changes encompass the previous schemas and client changes are in addition to existing APIs there are no compatibility concerns with this change.
all() {}
|
Proposed Changes
This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.
Wire Protocol Changes: The OffsetFetch API will be extended to take an collection of consumer group ids (Collection<String>). Previous behaviour will be maintained as single group id requests will be translated to single element arrays inside the AdminClient. The corresponding response does not currently contain the group id of the offsets fetched and so must also be extended to include this information.
Response Handling Changes: This change introduces a new level to represent group level information in org.apache.kafka.common.requests.OffsetFetchResponse and the data structure within this will need to be modified as follows:
The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseData now encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.
Any errors will be communicated at a per group id level using the existing ERROR_CODE field in the response (see above). As any errors produced should already be covered by existing behaviour no new error codes will be required. However, a new scenario is introduced whereby errors can be experienced for a subset of the groups represented in the new call. This scenario is handled by returning separate futures for each group that can be individually examined (see the all() and values()) methods in the Public Interface section. Failures for individual groups will not be automatically retried, it is the responsibility of the caller to react in the appropriate way to the communicated errors. For instance, if the group coordinator changes for one group whilst the request is in progress this will be communicated back to the client and this must resubmit a request to the appropriate new coordinator.
AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups as described under Public Interfaces.
Related/Future Work
The changes contained in this KIP refer specifically to offset listing requests. However, in order to perform these requests clients must first find the correct group coordinator. Optimisations for this process are contained in KIP-699 (KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time). These 2 KIPS are not interdependent.
Compatibility, Deprecation, and Migration Plan
As this will introduce a new protocol for the FetchOffset API, we need to ensure we have backwards compatibility with old clients sending requests to the new brokers. We are keeping the old API with the single group id the same, but simply making it call the new batched group ID API with a singleton list. This should ensure that we will have backwards compatibility with old clients. Newer clients connecting to newer brokers will use the new protocol regardless of the Admin API used. Newer clients connecting to older brokers will use the older protocol version, falling back to unbatched mode with multiple requests if the new batched API is usedShould an Admin client with this new functionality be used against an old broker that cannot handle these requests then the methods will throw UnsupportedVersionException as per the usual pattern.
Rejected Alternatives
Replace the existing listConsumerGroupOffsets methods with new methods with the multiple group signature. This simplifies the AdminClient interface but it’s assumed that the primary use case for fetching offsets will still be for a single group and so we should favour this in the interfaces.