Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. It reduces request overhead

  2. It simplifies client side code

Proposed Changes

This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.

Wire Protocol Changes: The OffsetFetch API will be extended to take an collection of consumer group ids (Collection<String>). Previous behaviour will be maintained as single group id requests will be translated to single element arrays inside the AdminClient. The corresponding response does not currently contain the group id of the offsets fetched and so must also be extended to include this information.

Response Handling Changes: This change introduces a new level to represent group level information in org.apache.kafka.common.requests.OffsetFetchResponse and the data structure within this will need to be modified as follows:

The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseDatanow encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.

Any errors will be communicated at a per group id level using the existing ERROR_CODE field in the response (see below). As any errors produced should already be covered by existing behaviour no new error codes will be required. However, a new scenario is introduced whereby errors can be experienced for a subset of the groups represented in the new call. This scenario is handled by returning separate futures for each group that can be individually examined (see the all() and values()) methods in the Public Interface section. Failures for individual groups will not be automatically retried, it is the responsibility of the caller to react in the appropriate way to the communicated errors. For instance, if the group coordinator changes for one group whilst the request is in progress this will be communicated back to the client and this must resubmit a request to the appropriate new coordinator.

AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups:

Code Block
public ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(final Collection<String> groupIds,
                                                                   final ListConsumerGroupOffsetsOptions options) 
default ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(Collection<String> groupIds) {
        return listConsumerGroupOffsets(groupIds, new ListConsumerGroupOffsetsOptions());
    }                                                                   

Public Interfaces

We will bump the OffsetFetch API. The new schemas are provided below:

...

Code Block
default ListConsumerGroupsResult listConsumerGroups() {
    return listConsumerGroups(new ListConsumerGroupsOptions());
}
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
These calls return a new result container ListConsumerGroupsOffsetsResult which yields the following (as per the usual pattern):

/**
 * The result of the {@link Admin#listConsumerGroupOffsets(Collection<String>)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListConsumerGroupsOffsetsResult {

    private final Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

    protected ListConsumerGroupsOffsetsResult(Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        this.futures = futures;
    }

    public Map<String,KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> values() {
        return futures;
    }

    /**
     * Return a future which succeeds only if all the offset fetches succeed.
     */
    public KafkaFuture<Map<String,Map<TopicPartition, OffsetAndMetadata>>> all() {

}

Proposed Changes

This proposal has 3 parts: 1) extending the wire protocol 2) response handling changes, 3) enhancing the AdminClient to use the new protocol.

Wire Protocol Changes: The OffsetFetch API will be extended to take an collection of consumer group ids (Collection<String>). Previous behaviour will be maintained as single group id requests will be translated to single element arrays inside the AdminClient. The corresponding response does not currently contain the group id of the offsets fetched and so must also be extended to include this information.

Response Handling Changes: This change introduces a new level to represent group level information in org.apache.kafka.common.requests.OffsetFetchResponse and the data structure within this will need to be modified as follows:

The parameter List<OffsetFetchResponseTopic> topics; in OffsetFetchResponseDatanow encapsulates a single group’s data rather than the entire request and so should be replaced with List<OffsetFetchResponseGroup> groups; where OffsetFetchResponseGroup is a new class described in Public Interface changes.

Any errors will be communicated at a per group id level using the existing ERROR_CODE field in the response (see above). As any errors produced should already be covered by existing behaviour no new error codes will be required. However, a new scenario is introduced whereby errors can be experienced for a subset of the groups represented in the new call. This scenario is handled by returning separate futures for each group that can be individually examined (see the all() and values()) methods in the Public Interface section. Failures for individual groups will not be automatically retried, it is the responsibility of the caller to react in the appropriate way to the communicated errors. For instance, if the group coordinator changes for one group whilst the request is in progress this will be communicated back to the client and this must resubmit a request to the appropriate new coordinator.

AdminClient changes: Consumer offsets are currently fetched in the listConsumerGroupOffsets method. Additional overloads of this method will be created that take multiple consumer groups:


Code Block
public ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(final Collection<String> groupIds,
                                                                   final ListConsumerGroupOffsetsOptions options) 
default ListConsumerGroupsOffsetsResult listConsumerGroupOffsets(Collection<String> groupIds) {
        return listConsumerGroupOffsets(groupIds, new ListConsumerGroupOffsetsOptions());
    }                                                                   

Compatibility, Deprecation, and Migration Plan

...