...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KAFKA-3853 asks for an improvement to the new-consumer
option of the consumer group command. This command, when passed a consumer group that has no consumer (i.e., when the group state is Empty
), currently reports an error indicating that there is no active member:
...
When the group is Stable
(i.e. when there are active consumers in the group), the offsets associated with the group are returned as part of the info returned for each consumer in the group in terms of a ConsumerSummary
record. But there is no ConsumerSummary
record if there is no consumer in the group, and therefore, in that scenario a different method is required for extracting those offsets.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
...
Binary log format
...
The network protocol and api behavior
...
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
...
Monitoring
...
Command line tools and arguments
This is the current schema for DescribeGroups (version 0).
Code Block | ||
---|---|---|
| ||
DescribeGroups Request (Version: 0) => [group_ids]
group_ids => STRING
DescribeGroups Response (Version: 0) => [groups]
groups => error_code group_id state protocol_type protocol [members]
error_code => INT16
group_id => STRING
state => STRING
protocol_type => STRING
protocol => STRING
members => member_id client_id client_host member_metadata member_assignment
member_id => STRING
client_id => STRING
client_host => STRING
member_metadata => BYTES
member_assignment => BYTES |
As shown above, and as explained in the previous section, corresponding topic partitions can be extracted only through the individual (and active) members
of the group. The list members
is empty if the group is in Empty
state, so there is no way to extract offsets of an Empty
group (problem 1).
Also, offset information for a Stable
group is not stored in the above Response
schema. Offsets associated with a consumer group are currently extracted using the committed
method of KafkaConsumer
and by creating a new and dummy member in the group for the sole purpose of making committed
calls passing on every topic partition of each of the other consumers in the group to extract the corresponding offset. Extracting the offsets without having to create the dummy member would definitely be more appealing (problem 2).
What is proposed below as version 1 of the Response
schema is to solve both problems above by including the group offset information directly into the schema (lines 17-21 below).
Code Block | ||
---|---|---|
| ||
DescribeGroups Request (Version: 1) => [group_ids]
group_ids => STRING
DescribeGroups Response (Version: 1) => [groups]
groups => error_code group_id state protocol_type protocol [members] [offsets]
error_code => INT16
group_id => STRING
state => STRING
protocol_type => STRING
protocol => STRING
members => member_id client_id client_host member_metadata member_assignment
member_id => STRING
client_id => STRING
client_host => STRING
member_metadata => BYTES
member_assignment => BYTES
offsets => topic partition offset metadata
topic => STRING
partition => INT32
offset => INT64
metadata => STRING |
...
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.