Status

Current state: "Accepted"

Discussion thread: here

JIRA: here 

Released: 2.0.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is aim to add support for describing consumer groups and list consumer groups to `KafkaAdminClient` class. This functionality is required on the Streams Resetter Tool as describe here: 

Public Interfaces

API


Add the following API in AdminClient:

public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options);
 
	public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
    	return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
	}
 
    public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

    public ListConsumerGroupsResult listConsumerGroups() {
        return listConsumerGroups(new ListConsumerGroupsOptions());
    }

    public abstract ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupsOptions options);
 
    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
        return listConsumerGroupOffsets(new ListConsumerGroupOffsetsOptions());
    }
 
    public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
 
    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
        return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
    }
}
 
public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> {
}
 
public class DescribeConsumerGroupResult {
    Map<TopicPartition, KafkaFuture<ConsumerGroupDescription>> all();
}
 
public class ConsumerGroupDescription {
    /**
     * The id of the consumer group.
     */
    public String groupId();

    /**
     * If consumer group is simple or not.
     */
    public boolean isSimpleConsumerGroup();

    /**
     * A list of the members of the consumer group.
     */
    public Collection<MemberDescription> members();

    /**
     * The consumer group partition assignor.
     */
    public String partitionAssignor();

    /**
     * The consumer group state, or UNKNOWN if the state is too new for us to parse.
     */
    public ConsumerGroupState state();

    /**
     * The consumer group coordinator, or null if the coordinator is not known.
     */
    public Node coordinator();

}


public enum ConsumerGroupState {
    UNKNOWN,
    PREPARING_REBALANCE,
    COMPLETING_REBALANCE,
    STABLE,
    DEAD,
    EMPTY;
}
 
public class MemberDescription {

    /**
     * The consumer id of the group member.
     */
    public String consumerId();

    /**
     * The client id of the group member.
     */
    public String clientId();

    /**
     * The host where the group member is running.
     */
    public String host();

    /**
     * The assignment of the group member.
     */
    public MemberAssignment assignment();
}
 
public class MemberAssignment {
    private final List<TopicPartition> topicPartitions;
}

public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}

public class ListConsumerGroupsResult {
    public KafkaFuture<Collection<ConsumerGroupListing>> all();
    public KafkaFuture<Collection<ConsumerGroupListing>> valid();
	public KafkaFuture<Collection<Throwable>> errors();
} 
 
public class ConsumerGroupListing {
    private final String groupId;
    private final boolean isSimpleConsumerGroup;
}
 
public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}
 
public class ListConsumerGroupOffsetsResult {
    KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> all();
}
 
public class DeleteConsumerGroupOptions extends AbstractOptions<DeleteConsumerGroupOptions> {
}
 
public class DeleteConsumerGroupsResult {
    Map<TopicPartition, KafkaFuture<Void>> all();
}

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

Proposed Changes

  1. Add `#describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API
  2. Add `#listConsumerGroups(ListConsumerGroupsOptions options)` and `#listConsumerGroups()` to `AdminClient` API 
  3. Add `#listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)` and `#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and
  4. Implement it on `KafkaAdminClient`.
  5. Solve 
  6. Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API based on KIP-229: DeleteGroups API.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Future Work

References:

Pull Request: https://github.com/apache/kafka/pull/4454