Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When the group is Stable (i.e. when there are active consumers in the group), {{DescribeGroups Response}} the above command returns the associated topic partition assignment for each member of the group, and that assignment can be used to extract the corresponding committed offset(s). However, if the group state is Empty (i.e. when there are no active consumers in the group) there is no associated topic partition info in {{DescribeGroups Response}}. Therefore, DescribeGroups Response in its current protocol would not help.

The OffsetFetch protocol can be used to extract offsets associated with given topic partitions in a consumer group. The problem is, when consumer group is in Empty state or even when it is Stable but not all its topics are being consumed, currently there is no way to extract all its topic partitions that it has consumed from so far. We can modify the behavior of OffsetFetch protocol so it returns all topic partitions if a null value is passed instead of a associated with the group if it is passed an empty list (as the list of topic partitions)

Public Interfaces

This is the current schema for OffsetFetch (version 1, that applies to fetching from Kafka, and not ZooKeeper).

...

The first suggestion is to use the same Request / Response protocol but bump up the version to 2 since there is going to be a change in how the protocol is implemented. What will change in the protocol implementation is if null value an empty list is passed instead of a list of topic partitions the API will return the offsets of all topic partitions associated with the group.

...

This committed call makes use of the OffsetFetch API to extract the offset of the given partition. The suggestion here is to add a new method committedOffsets to the Java Consumer that similarly makes method to AdminClient that extracts offset information of a consumer group by making a call to OffsetFetch API, but with a null parameter as list and passing an empty list of topic partitions. That will return all offsets of topic partitions associated with the consumer's group:

Code Block
languagejavascala
def listGroupOffsets(groupId: String): Map[TopicPartition, PartitionData] =public Map<TopicPartition, OffsetAndMetadata> committedOffsets() {
 val coordinator = acquirefindCoordinator(groupId);
 val responseBody = try {
        return coordinator.fetchCommittedOffsets(null);
    } finally {
        release();
    }
}send(coordinator, ApiKeys.OFFSET_FETCH, new OffsetFetchRequest(groupId, List()))
 val response = new OffsetFetchResponse(responseBody)
 response.responseData().asScala.toMap
}

One benefit of using this method instead of using the KafkaConsumer's committed method is that we no longer need to create the dummy consumer to retrieve offsets. The other benefit here is that with one API call all offsets within the group are returned. Whereas, in the existing describe group implementation, for each topic partition in the group one API call is made.

...

  1. Add to the implementation of the OffsetFetch API the scenario where null is where an empty list is passed as the list of topic partitions, and in response, offsets of all topic partitions associated with the group are returned.
  2. Add a committedOffsets listGroupOffsets interface to Java Consumer that AdminClient that makes use of the updated OffsetFetch API above and returns offsets of all topic partitions associated with the consumer's group. 

...

With respect to the first proposed change above the compatibility could become a concern if current users somehow rely on a different behavior when a null value is an empty list is passed to the OffsetFetch API as the list of topic partitions. If users would want to stick to the current implementation of the API they can use version 1 of the API. The change suggested in this KIP would apply to version 2 of the API only.

With respect to the added interface to Java Consumer there AdminClient there would be no issue as that interface does not exist in current implementation.

...

  1. Changing the DescribeGroups protocol so it also returns the offset information for all topic partitions from which the group has consumed from since its creation. More detailed can be found here.
  2. Exposing the added OffsetFetch behavior through a new interface in {{KafkaConsumer}}, which would still imply that the dummy consumer has to be created in the group in order to retrieve offsets.