Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

KAFKA-3853 asks for an improvement to the new-consumer describe option of the consumer group command for new (Java API based) consumers. This command, when passed a consumer group that has no consumer (i.e., when the group state is Empty), currently reports an error indicating that there is no active member:

...

When the group is Stable (i.e. when there are active consumers in the group), {{DescribeGroups Response}} returns the associated topic partition assignment for each member of the group, and that assignment can be used to extract the corresponding committed offset(s). However, if the group state is Empty (i.e. when there are no active consumers in the group) there is no associated topic partition info in {{DescribeGroups Response}}. Therefore, a change in the protocol seems to be the only way to extract associated topic partitions of a group and the corresponding committed offsets. DescribeGroups Response in its current protocol would not help.

The OffsetFetch protocol can be used to extract offsets associated with given topic partitions in a consumer group. The problem is, when consumer group is in Empty state or even when it is Stable but not all its topics are being consumed, currently there is no way to extract all its topic partitions that it has consumed from so far. We can modify the behavior of OffsetFetch protocol so it returns all topic partitions if a null value is passed instead of a list of topic partitions. 

Public Interfaces

This is the current schema for DescribeGroups OffsetFetch (version 01, that applies when fetching from Kafka, and not ZooKeeper).

Code Block
linenumberstrue
DescribeGroupsOffsetFetch Request (Version: 01) => [group_idsid [topics] 
  group_idsid => STRING
 
DescribeGroups Response (Version: 0) topics => topic [groupspartitions] 
  groups  topic => STRING
 error_code group_id state protocol_type protocol [members]partitions => partition 
     error_code partition => INT16INT32
 
OffsetFetch Response (Version: 1) group_id => [responses] STRING
    stateresponses => STRING
    protocol_type => STRINGtopic [partition_responses] 
    protocoltopic => STRING
    memberspartition_responses => member_id client_id client_host member_metadata member_assignmentpartition offset metadata error_code 
      member_idpartition => STRINGINT32
      client_idoffset => STRINGINT64
      client_hostmetadata => NULLABLE_STRING
      membererror_metadatacode => BYTES
      member_assignment => BYTES

As shown above, and as explained in the previous section, corresponding topic partitions can be extracted only through the individual (and active) members of the group. The list members is empty if the group is in Empty state, so with this protocol there is no way to extract offsets of an Empty group (problem 1).

Also, offset information for a Stable group is not stored in the above Response schema. Offsets associated with a Stable consumer group are currently extracted using the committed method of KafkaConsumer and by creating a new and dummy member in the group for the sole purpose of making committed calls on every topic partition of each of the other consumers in the group to extract the corresponding offsets. A cleaner approach would be extracting these offsets without having to create this dummy member (problem 2).

Proposed below is version 1 of the DescribeGroups schema that solves both problems above by including the group offset information directly into the Response schema (lines 17-21 below). Note that topic partitions reported under the offsets field are going to be a superset of topic partitions reported under members:member_assignment.

Code Block
linenumberstrue
DescribeGroups Request (Version: 1) => [group_ids] 
  group_ids => STRING
 
DescribeGroups Response (Version: 1) => [groups] 
  groups => error_code group_id state protocol_type protocol [members] [offsets]
    error_code => INT16
    group_id => STRING
    state => STRING
    protocol_type => STRING
    protocol => STRING
    members => member_id client_id client_host member_metadata member_assignment 
      member_id => STRING
      client_id => STRING
      client_host => STRING
      member_metadata => BYTES
      member_assignment => BYTES
    offsets => topic partition offset metadata
      topic => STRING
      partition => INT32
      offset => INT64
      metadata => STRING

Proposed Changes

The proposal is to include a list offsets of tuples in the response to DescribeGroups request. Each of these tuples contains the following fields:

...

INT16
 

The first suggestion is to use the same Request / Response protocol but bump up the version to 2 since there is going to be a change in how the protocol is implemented. What will change in the protocol implementation is if a null value is passed instead of a list of topic partitions the API will return the offsets of all topic partitions associated with the group.

The second suggestion has to do with how the above API is accessed and called. Currently, the way the offset information for each topic partition in a Stable group is returned is through creating a "dummy" consumer in the group and use its {{committed}} interface to extract those offset information:

Code Block
languagescala
val consumer = getConsumer()
...
... consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition)) ...
...
})

This committed call makes use of the OffsetFetch API to extract the offset of the given partition. The suggestion here is to add a new method committedOffsets to the Java Consumer that similarly makes a call to OffsetFetch API, but with a null parameter as list of topic partitions. That will return all offsets of topic partitions associated with the consumer's group:

Code Block
languagejava
public Map<TopicPartition, OffsetAndMetadata> committedOffsets() {
    acquire();
    try {
        return coordinator.fetchCommittedOffsets(null);
    } finally {
        release();
    }
}

The other benefit here is with one API call all offsets within the group are returned. Whereas, in the existing describe group implementation, for each topic partition in the group one API call is made.

Proposed Changes

The proposal is to

  1. Add to the implementation of the OffsetFetch API the scenario where null is passed as the list of topic partitions, and in response, offsets of all topic partitions associated with the group are returned.
  2. Add a committedOffsets interface to Java Consumer that makes use of the updated OffsetFetch API above and returns offsets of all topic partitions associated with the consumer's group. 

 

...

Compatibility, Deprecation, and Migration Plan

There should be no impact to existing users of this protocol. The Request schema remains unchanged, and the Response schema is only expanded. There is no change in the existing Response structure, and, therefore, users who use version 0 of the schema should not be affected. The code behind kafka-consumer-groups.sh will also be updated (as requested in the JIRA) to return additional information (rows of topic partitions with no assigned consumer). In case it is necessary to provide the old output as well, a new parameter can be defined for the tool to report the new information; while the old way of running the tool still prints out the old output.

Rejected Alternatives

...

With respect to the first proposed change above the compatibility could become a concern if current users somehow rely on a different behavior when a null value is passed to the OffsetFetch API as the list of topic partitions. If users would want to stick to the current implementation of the API they can use version 1 of the API. The change suggested in this KIP would apply to version 2 of the API only.

With respect to the added interface to Java Consumer there would be no issue as that interface does not exist in current implementation.

Rejected Alternatives

  1. Changing the DescribeGroups protocol so it also returns the offset information for all topic partitions from which the group has consumed from since its creation.