Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 

Status

Current state:   "Under DiscussionAccepted"

Discussion thread: here

JIRA: here 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6058

Released: 2.0.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeGroupsResultDescribeConsumerGroupsResult describeGroupsdescribeConsumerGroups(Collection<String> groupIds, DescribeGroupsOptions DescribeConsumerGroupsOptions options);
 
	public DescribeGroupsResultDescribeConsumerGroupsResult describeGroupsdescribeConsumerGroups(Collection<String> groupIds) {
    	return describeGroupsdescribeConsumerGroups(groupIds, new DescribeGroupsOptionsDescribeConsumerGroupsOptions());
	}
 
    public abstract ListGroupsResultListConsumerGroupsResult listGroupslistConsumerGroups(ListGroupsOptionsListConsumerGroupsOptions options);

    public ListGroupsResultListConsumerGroupsResult listGroupslistConsumerGroups() {
        return listGroupslistConsumerGroups(new ListGroupsOptionsListConsumerGroupsOptions());
    }

    public abstract ListGroupsResultListConsumerGroupOffsetsResult listGroups(ListGroupsOptionslistConsumerGroupOffsets(String groupId, ListConsumerGroupsOptions options) {
        //filtering groups by ConsumerProtocol.PROTOCOL_TYPE
    }

   ;
 
    public ListGroupsResultListConsumerGroupOffsetsResult listGroupslistConsumerGroupOffsets(String groupId) {
        return listGroupslistConsumerGroupOffsets(new ListGroupsOptionsListConsumerGroupOffsetsOptions());
    }
 
    
public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> public ListGroupOffsetsResult listGroupOffsets(ListGroupsOptionsgroupIds, DeleteConsumerGroupsOptions options);
 
    public ListGroupOffsetsResultDeleteConsumerGroupsResult listGroupOffsetsdeleteConsumerGroups(Collection<String> groupIds) {
        return listGroupOffsetsdeleteConsumerGroups(groupIds, new ListGroupOffsetsOptionsDeleteConsumerGroupsOptions());
    }
}
 
public class DescribeGroupOptionsDescribeConsumerGroupOptions extends AbstractOptions<DescribeGroupOptions>AbstractOptions<DescribeConsumerGroupOptions> {
}
 
public class DescribeGroupResultDescribeConsumerGroupResult {
    private final Map<StringMap<TopicPartition, KafkaFuture<GroupDescription>>KafkaFuture<ConsumerGroupDescription>> futures;
    public DescribeGroupsResult(Map<String, KafkaFuture<GroupDescription>> futures) {
all();
}
 
public class ConsumerGroupDescription {
    /**
     * The id of this.futuresthe =consumer futures;group.
     }
*/
    public Map<String, KafkaFuture<GroupDescription>> valuesString groupId() {
        return futures;;

    }
/**
    public KafkaFuture<Map<String,* GroupDescription>>If all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))consumer group is simple or not.
     */
    public boolean  thenApply(new KafkaFuture.Function<Void, Map<String, GroupDescription>>() {isSimpleConsumerGroup();

    /**
     * A list of the members of the consumer   @Overridegroup.
            */
    public Map<String, GroupDescription> apply(Void v) {Collection<MemberDescription> members();

    /**
     * The consumer group partition assignor.
     */
 Map<String, GroupDescription> descriptions =public new HashMap<>(futures.size()String partitionAssignor();

    /**
     * The consumer group state, or UNKNOWN if the state is for (Map.Entry<String, KafkaFuture<GroupDescription>> entry : futures.entrySet()) {too new for us to parse.
     */
    public ConsumerGroupState state();

             try {/**
     * The consumer group coordinator, or null if the coordinator is not known.
     */
    public  descriptions.put(entry.getKey(), entry.getValue().get());
   Node coordinator();

}


public enum ConsumerGroupState {
    UNKNOWN,
    PREPARING_REBALANCE,
    COMPLETING_REBALANCE,
    STABLE,
    DEAD,
 } catch (InterruptedException | ExecutionException e) { EMPTY;
}
 
public class MemberDescription {

    /**
     * The consumer id of the group member.
            // This should be unreachable, because allOf ensured that all the futures*/
    public String consumerId();

    /**
     * The client id of the group member.
     */
    public   String clientId();

    // completed successfully.**
     * The host where the group member is running.
     */
    public      throw new RuntimeException(eString host();

    /**
     * The assignment of the group member.
         }*/
    public                }
   MemberAssignment assignment();
}
 
public class MemberAssignment {
    private final            return descriptions;
                }
            });
    }
}
 List<TopicPartition> topicPartitions;
}

public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}

public class GroupDescriptionListConsumerGroupsResult {
    privatepublic final String groupIdKafkaFuture<Collection<ConsumerGroupListing>> all();
    privatepublic final String protocolTypeKafkaFuture<Collection<ConsumerGroupListing>> valid();
	public    private final List<ConsumerDescription> consumersKafkaFuture<Collection<Throwable>> errors();
} 
 
public class MemberDescriptionConsumerGroupListing {
    private final String consumerIdgroupId;
    private final Stringboolean clientIdisSimpleConsumerGroup;
    private final String host;
    private final List<TopicPartition> assignment;
}
 
public class ListGroupsOptionsListConsumerGroupOffsetsOptions extends AbstractOptions<ListGroupsOptions>AbstractOptions<ListConsumerGroupsOptions> {
}
 
public class ListGroupsResultListConsumerGroupOffsetsResult {
    final KafkaFuture<Map<StringKafkaFuture<Map<TopicPartition, GroupListing>>OffsetAndMetadata>> future;

    //...all();
}
 
public class GroupListing {
    private final String name;
    private final String protocolType;
}
 
public class ListGroupOffsetsOptionsDeleteConsumerGroupOptions extends AbstractOptions<ListGroupsOptions>AbstractOptions<DeleteConsumerGroupOptions> {
}
 
public class ListGroupOffsetsResultDeleteConsumerGroupsResult {
    final KafkaFuture<Map<TopicPartitionMap<TopicPartition, Long>>KafkaFuture<Void>> future;
 
    //...all();
}
 

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

Proposed Changes

  1. Add `#describeGroups`#describeConsumerGroups(Collection<String> groupIds, DescribeGroupsOptions DescribeConsumerGroupsOptions options)` and `#describeGroups`#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API API
  2. Add `#listGroups`#listConsumerGroups(ListGroupsOptions ListConsumerGroupsOptions options)` and `#listGroups`#listConsumerGroups()` to `AdminClient` API 
  3. Add `#listGroupOffsets(ListGroupOffsetsOptions `#listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)` and `#listGroupOffsets`#listConsumerGroupOffsets(String groupId)` to `AdminClient` API and
  4. Implement it on `KafkaAdminClient`.
  5. Solve 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-6058
  6. Add `#deleteConsumerGroups(Collection<String> groupIds, ConsumerGroupsOptions options)` and `#deleteConsumerGroups(Collection<String> groupIds)` to `AdminClient` API based on KIP-229: DeleteGroups API.

Compatibility, Deprecation, and Migration Plan

  • There won't be any impact on existing users.
  • There won't be any change of current behavior.
  • No migration tool required.

Rejected Alternatives

  • We are moving one more functionality from `core`'s `AdminClient` API to `clients`'s `AdminClient` API. This is aim to remove dependencies to `core` module.
  • Aggregating both Consumer Groups and Connect Groups in the same API would increase complexity adding generic MemberAssignment to match both groups. This KIP is only including Consumer Groups and will leave Connect Groups to be handled by another KIP. https://github.com/apache/kafka/pull/4454#issuecomment-360553277 

Future Work

  • Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.

References:

Pull Request: https://github.com/apache/kafka/pull/4454