Status
Current state: Draft
Discussion thread: here
JIRA: KAFKA-3853
Released: <Kafka Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KAFKA-3853 asks for an improvement to the new-consumer
option of the consumer group command. This command, when passed a consumer group that has no consumer (i.e., when the group state is Empty
), currently reports an error indicating that there is no active member:
$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group1 Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers). Error: Consumer group 'group1' has no active members.
The requested improvement is returning offsets within the group (and leaving the owner column empty) instead of returning the error message above. The error message can still be printed to to stderr
as a warning.
When the group is Stable
(i.e. when there are active consumers in the group), the offsets associated with the group are returned as part of the info returned for each consumer in the group in terms of a ConsumerSummary
record. But there is no ConsumerSummary
record if there is no consumer in the group, and therefore, in that scenario a different method is required for extracting those offsets.
Public Interfaces
This is the current schema for DescribeGroups (version 0).
DescribeGroups Request (Version: 0) => [group_ids] group_ids => STRING DescribeGroups Response (Version: 0) => [groups] groups => error_code group_id state protocol_type protocol [members] error_code => INT16 group_id => STRING state => STRING protocol_type => STRING protocol => STRING members => member_id client_id client_host member_metadata member_assignment member_id => STRING client_id => STRING client_host => STRING member_metadata => BYTES member_assignment => BYTES
As shown above, and as explained in the previous section, corresponding topic partitions can be extracted only through the individual (and active) members
of the group. The list members
is empty if the group is in Empty
state, so there is no way to extract offsets of an Empty
group (problem 1).
Also, offset information for a Stable
group is not stored in the above Response
schema. Offsets associated with a consumer group are currently extracted using the committed
method of KafkaConsumer
and by creating a new and dummy member in the group for the sole purpose of making committed
calls passing on every topic partition of each of the other consumers in the group to extract the corresponding offset. Extracting the offsets without having to create the dummy member would definitely be more appealing (problem 2).
Proposed below is version 1 of the DescribeGroups
schema to solve both problems above by including the group offset information directly into the Response
schema (lines 17-21 below).
DescribeGroups Request (Version: 1) => [group_ids] group_ids => STRING DescribeGroups Response (Version: 1) => [groups] groups => error_code group_id state protocol_type protocol [members] [offsets] error_code => INT16 group_id => STRING state => STRING protocol_type => STRING protocol => STRING members => member_id client_id client_host member_metadata member_assignment member_id => STRING client_id => STRING client_host => STRING member_metadata => BYTES member_assignment => BYTES offsets => topic partition offset metadata topic => STRING partition => INT32 offset => INT64 metadata => STRING
Proposed Changes
The proposal is to include a list offsets
of tuples in the response to DescribeGroups request. Each of these tuples contains the following fields:
- topic: topic from which a previous member of the group consumed.
- partition: partition of the above topic for which offset information associated with the group exists
- offset: associated committed offset of the above topic partition within the group
- metadata: metadata associated with the above offset
Compatibility, Deprecation, and Migration Plan
There should be no impact to existing users of this protocol. The Request
schema remains unchanged, and the Response
schema is only expanded. There is no change in the existing Response
structure, and, therefore, users who use version 0 of the schema should not be affected. The code behind kafka-consumer-groups.sh
will also be updated (as requested in the JIRA) to return more helpful information. In case it is necessary to provide the old output as well, a new parameter can be defined for the tool to report the new information; while the old way of running the tool still prints out the old output.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.