Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KAFKA-3853 asks for an improvement to the new-consumer option of the consumer group command. This command, when passed a consumer group that has no consumer (i.e., when the group state is Empty), currently reports an error indicating that there is no active member:

...

When the group is Stable (i.e. when there are active consumers in the group), the offsets associated with the group are returned as part of the info returned for each consumer in the group in terms of a ConsumerSummary record. But there is no ConsumerSummary record if there is no consumer in the group, and therefore, in that scenario a different method is required for extracting those offsets.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

...

Binary log format

...

The network protocol and api behavior

...

Any class in the public packages under clientsConfiguration, especially client configuration

  • org/apache/kafka/common/serialization

  • org/apache/kafka/common

  • org/apache/kafka/common/errors

  • org/apache/kafka/clients/producer

  • org/apache/kafka/clients/consumer (eventually, once stable)

...

Monitoring

...

Command line tools and arguments

This is the current schema for DescribeGroups (version 0).

Code Block
linenumberstrue
DescribeGroups Request (Version: 0) => [group_ids] 
  group_ids => STRING
 
DescribeGroups Response (Version: 0) => [groups] 
  groups => error_code group_id state protocol_type protocol [members] 
    error_code => INT16
    group_id => STRING
    state => STRING
    protocol_type => STRING
    protocol => STRING
    members => member_id client_id client_host member_metadata member_assignment 
      member_id => STRING
      client_id => STRING
      client_host => STRING
      member_metadata => BYTES
      member_assignment => BYTES

As shown above, and as explained in the previous section, corresponding topic partitions can be extracted only through the individual (and active) members of the group. The list members is empty if the group is in Empty state, so there is no way to extract offsets of an Empty group (problem 1).

Also, offset information for a Stable group is not stored in the above Response schema. Offsets associated with a consumer group are currently extracted using the committed method of KafkaConsumer and by creating a new and dummy member in the group for the sole purpose of making committed calls passing on every topic partition of each of the other consumers in the group to extract the corresponding offset. Extracting the offsets without having to create the dummy member would definitely be more appealing (problem 2).

What is proposed below as version 1 of the Response schema is to solve both problems above by including the group offset information directly into the schema (lines 17-21 below).

Code Block
linenumberstrue
DescribeGroups Request (Version: 1) => [group_ids] 
  group_ids => STRING
 
DescribeGroups Response (Version: 1) => [groups] 
  groups => error_code group_id state protocol_type protocol [members] [offsets]
    error_code => INT16
    group_id => STRING
    state => STRING
    protocol_type => STRING
    protocol => STRING
    members => member_id client_id client_host member_metadata member_assignment 
      member_id => STRING
      client_id => STRING
      client_host => STRING
      member_metadata => BYTES
      member_assignment => BYTES
    offsets => topic partition offset metadata
      topic => STRING
      partition => INT32
      offset => INT64
      metadata => STRING

 

 

...

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.