...
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
/** * List the consumer group offsets foravailable ain giventhe collectioncluster offor groupsthe availablespecified inconsumer the clustergroups. * * @param groupIds A collection of groups for which offsets will be fetchedgroupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for. * * @param options The options to use when listing the consumer group offsets. * @return The ListConsumerGroupsOffsetsResultListConsumerGroupOffsetsResult */ ListConsumerGroupOffsetsResult public ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(final Collection<String> groupIds, listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options); /** * List the consumer group offsets available in the cluster for the specified groups with the default options. * <p> * This is a convenience method for * {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} with default options. * * @param finalgroupSpecs ListConsumerGroupOffsetsOptionsMap options)of consumer group ids to /** a spec that specifies the topic *partitions Listof the consumergroup to grouplist offsets for. a given collection of groups* available@return inThe theListConsumerGroupOffsetsResult. cluster with the default options.*/ default * <p> * This is a convenience method for {@link #listConsumerGroupsOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) { return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions()); } |
New class ListConsumerGroupOffsetsSpec
will be added to specify the partitions for each group. We can add additional per-group filters to this spec in future if required.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Specification of consumer *group @returnoffsets Theto ListConsumerGroupsOffsetsResult. list using {@link Admin#listConsumerGroupOffsets(java.util.Map)}. */ * The API of this defaultclass ListConsumerGroupsOffsetsResult listConsumerGroupsOffsets(Collection<String> groupIds) { is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class ListConsumerGroupOffsetsSpec { return listConsumerGroupsOffsets(groupIds, new ListConsumerGroupOffsetsOptions())private Collection<TopicPartition> topicPartitions; } |
ListConsumerGroupsResult will also change to include a mapping of group id to the map of TopicPartition and OffsetAndMetadata
Code Block |
---|
/** * The result* ofSet the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call. * <p> * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class ListConsumerGroupsOffsetsResult { private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures; topic partitions whose offsets are to be listed for a consumer group. * {@code null} includes all topic partitions. * * @param topicPartitions List of topic partitions to include protected ListConsumerGroupsOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {* @return This ListConsumerGroupOffsetSpec */ this.futures =public futures; } ListConsumerGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) { public Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {this.topicPartitions = topicPartitions; return futuresthis; } /** * ReturnReturns athe futuretopic whichpartitions succeedswhose onlyoffsets if all the offset fetches succeedare to be listed for a consumer group. * {@code null} indicates that offsets of all partitions of the group are to be listed. */ public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> allCollection<TopicPartition> topicPartitions() { } |
...
return topicPartitions;
}
} |
ListConsumerGroupOffsetsResult will also change to include a mapping of group id to the
...
map of TopicPartition and OffsetAndMetadata
Code Block | ||
---|---|---|
| ||
/** * Options for The result of the {@link Admin#listConsumerGroupOffsets(Map)} and * {@link Admin#listConsumerGroupOffsets(String)} call. * <p> * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving @InterfaceStability.Evolving public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions>ListConsumerGroupOffsetsResult { //final mapsMap<String, aKafkaFuture<Map<TopicPartition, consumer group to the list of partitions we want to // get offsets for private Map<String, List<TopicPartition>> topicPartitions = null;OffsetAndMetadata>>> futures; ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) { } /** * Set the topic partitions to list as part of the resultReturn a future which yields a map of topic partitions to OffsetAndMetadata objects. * {@code null} includes all topic partitionsIf the group does not have a committed offset for each group. * * @param topicPartitions Map of group to List of topic partitions to includethis partition, the corresponding value in the returned map will be null. */ public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {} * @return This ListGroupOffsetsOptions/** */ Return a map publicof ListConsumerGroupOffsetsOptions topicPartitions(Map<String, List<TopicPartition>> topicPartitions) { this.topicPartitions = topicPartitions;group ids to their corresponding futures that yield a map of topic partitions to * OffsetAndMetadata objects. return this; } /**If the group doesn't have a committed offset for a specific * partition, the Returnscorresponding avalue listin ofthe topicreturned partitionsmap tofor addthat asgroup partid ofwill thebe resultnull. */ public Map<StringKafkaFuture<Map<TopicPartition, List<TopicPartition>>OffsetAndMetadata>> topicPartitionspartitionsToOffsetAndMetadata(String groupId) {} /** * Return return topicPartitions; a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects, } } |
Proposed Changes
* if requests for all the groups succeed.
*/
public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {}
|
Proposed Changes
This proposal has 3 parts: 1) This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.
...
AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups:
...
that take multiple consumer groups as described under Public Interfaces.
Related/Future Work
The changes contained in this KIP refer specifically to offset listing requests. However, in order to perform these requests clients must first find the correct group coordinator. Optimisations for this process are contained in KIP-699 (KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time). These 2 KIPS are not interdependent.
...
As this will introduce a new protocol for the FetchOffset API, we need to ensure we have backwards compatibility with old clients sending requests to the new brokers. We are keeping the old API with the single group id the same, but simply making it call the new batched group ID API with a singleton list. This should ensure that we will have backwards compatibility with old clients. Newer clients connecting to newer brokers will use the new protocol regardless of the Admin API used. Newer clients connecting to older brokers will use the older protocol version, falling back to unbatched mode with multiple requests if the new batched API is used.
Rejected Alternatives
Replace the existing listConsumerGroupOffsets methods with new methods with the multiple group signature. This simplifies the AdminClient interface but it’s assumed that the primary use case for fetching offsets will still be for a single group and so we should favour this in the interfaces.