Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: here 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6058

Released: 2.0.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options);
 
	public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
    	return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
	}
 
    public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

    public ListConsumerGroupsResult listConsumerGroups() {
        return listConsumerGroups(new ListConsumerGroupsOptions());
    }

    public abstract ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupsOptions options);
 
    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
        return listConsumerGroupOffsets(new ListConsumerGroupOffsetsOptions());
    }
 
    public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
 
    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
        return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
    }
}
 
public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> {
}
 
public class DescribeConsumerGroupResult {
    private final Map<StringMap<TopicPartition, KafkaFuture<ConsumerGroupDescription>> futures;
...all();
}
 
public class ConsumerGroupDescription {
    /**
     * The id of the consumer group.
    private final */
    public String groupId();

    /**
     * If consumer group is simple or not.
     */
 private   finalpublic boolean isSimpleConsumerGroup();

    private final List<MemberDescription> members;
    private final String partitionAssignor;
}
 /**
     * A list of the members of the consumer group.
     */
    public Collection<MemberDescription> members();

    /**
     * The consumer group partition assignor.
     */
    public String partitionAssignor();

    /**
     * The consumer group state, or UNKNOWN if the state is too new for us to parse.
     */
    public ConsumerGroupState state();

    /**
     * The consumer group coordinator, or null if the coordinator is not known.
     */
    public Node coordinator();

}


public enum ConsumerGroupState {
    UNKNOWN,
    PREPARING_REBALANCE,
    COMPLETING_REBALANCE,
    STABLE,
    DEAD,
    EMPTY;
}
 
public class MemberDescription {

    /**
     * The consumer id of the group member.
    private final */
    public String consumerId();

    /**
     private final* The client id of the group member.
     */
    public String clientId();

    /**
     * The host where the group member is running.
     private final*/
    public String host();

    /**
     * The assignment of the group member.
     */
   private finalpublic MemberAssignment assignment();
}
 
public class MemberAssignment {
    private final List<TopicPartition> topicPartitions;
}

public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}

public class ListConsumerGroupsResult {
    finalpublic KafkaFuture<Map<String, GroupListing>> futureKafkaFuture<Collection<ConsumerGroupListing>> all();

    public  //...
}KafkaFuture<Collection<ConsumerGroupListing>> valid();
	public KafkaFuture<Collection<Throwable>> errors();
} 
 
public class ConsumerGroupListing {
    private final String groupId;
    private final boolean isSimpleConsumerGroup;
}
 
public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}
 
public class ListConsumerGroupOffsetsResult {
    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> futureall();
}
 
public class DeleteConsumerGroupOptions extends AbstractOptions<DeleteConsumerGroupOptions> //...{
}
 
public class DeleteConsumerGroupsResult {
    Map<TopicPartition, KafkaFuture<Void>> all();
}

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

...

  1. Add `#describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API
  2. Add `#listConsumerGroups(ListConsumerGroupsOptions options)` and `#listConsumerGroups()` to `AdminClient` API 
  3. Add `#listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)` and `#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and
  4. Implement it on `KafkaAdminClient`.
  5. Solve 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-6058
  6. Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API once based on KIP-229: DeleteGroups API is merged.

Compatibility, Deprecation, and Migration Plan

...