...
The OffsetFetch
protocol can be used to extract offsets associated with given topic partitions in a consumer group. The problem is, when consumer group is in Empty
state or even when it is Stable
but not all its topics are being consumed, currently there is no way to extract all its topic partitions that it has consumed from so far(i.e. has offset for). We can modify the behavior of OffsetFetch
protocol so it returns all topic partitions associated with the group if it is passed an empty list a null
value (as the list of topic partitions).
...
Code Block | ||
---|---|---|
| ||
OffsetFetch Request (Version: 1) => group_id [topics] group_id => STRING topics => topic [partitions] topic => STRING partitions => partition partition => INT32 OffsetFetch Response (Version: 1) => [responses] responses => topic [partition_responses] topic => STRING partition_responses => partition offset metadata error_code partition => INT32 offset => INT64 metadata => NULLABLE_STRING error_code => INT16 |
The first suggestion is to use the same Request
/ Response
protocol but bump up the version to 2 since there is going to be a change in how the protocol is implemented. What will change in The only implicit change to the protocol implementation is if an empty list is passed instead of a list of topic partitions the API will return the offsets of all topic partitions associated with the group.that if no topics (null
input for list of topics) are provided, the offset information of all topics (or topic partitions) associated with the group is returned. So the protocol for will slightly be modified to define the input topics
array as a nullable array.
Code Block | ||
---|---|---|
| ||
public static final Schema OFFSET_FETCH_REQUEST_V2 = new Schema(new Field("group_id",
STRING,
"The consumer group id."),
new Field("topics",
ArrayOf.nullable(OFFSET_FETCH_REQUEST_TOPIC_V0),
"Topics to fetch offsets."));
public static final Schema OFFSET_FETCH_RESPONSE_V2 = OFFSET_FETCH_RESPONSE_V0; |
The second suggestion has to do with how the above API is accessed and called. Currently, the way the offset information for each topic partition in a Stable
group is returned is through creating a "dummy" consumer in the group and use its committed
interface to extract those offset information:
...
This committed
call makes use of the OffsetFetch
API to extract the offset of the given partition. The suggestion here is to add a method to AdminClient
that extracts offset information of a consumer group by making a call to OffsetFetch
API, and passing an empty list a null
input as list of topic partitions. That will return all offsets of topic partitions associated with the consumer's group:
Code Block | ||
---|---|---|
| ||
def listGroupOffsets(groupId: String): Map[TopicPartition, PartitionData] = { val coordinator = findCoordinator(groupId) val responseBody = send(coordinator, ApiKeys.OFFSET_FETCH, new OffsetFetchRequest.allPartitionsOffsetFetchRequest(groupId, List())) val response = responseBody.asInstanceOf[OffsetFetchResponse] response.responseData().asScala.toMap } |
One benefit of using this method instead of using the KafkaConsumer
's committed
method is that we no longer need to create the dummy consumer to retrieve offsets. The other benefit here is that with one API call all offsets within the group are returned. Whereas, in the existing describe group implementation, for each topic partition in the group one API call is made.
...
- Add to the implementation of the
OffsetFetch
API the scenario where an empty list is anull
value is passed as the list of topic partitions, and in response, offsets of all topic partitions associated with the group are returned. - Add a
listGroupOffsets
interface toAdminClient
that makes use of the updatedOffsetFetch
API above and returns offsets of all topic partitions associated with the consumer's group.
...
With respect to the first proposed change above the compatibility could become a concern if current users somehow rely on a different behavior when an empty list is a null
list is passed to the OffsetFetch
API as the list of topic partitions. If users would want to stick to the current implementation of the API they can use version 1 of the API. The change suggested in this KIP would apply to version 2 of the API only.
...
- Changing the
DescribeGroups
protocol so it also returns the offset information for all topic partitions from which the group has consumed from since its creation. More detailed can be found here. - Exposing the added
OffsetFetch
behavior through a new interface inKafkaConsumer
, which would still imply that the dummy consumer has to be created in the group in order to retrieve offsets. More details can be found here. - Passing an empty list, instead of
null
, to the API to get all offsets. Thenull
value was chosen to remain consistent with how some other APIs handle a similar situation (e.g.TopicMetadata
request provides metadata of all topics if it is provided with anull
value as list of topics).