Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

All use of `AdminClient` within the Kafka project will be replaced with use of the `Admin` interface, with the exception of any classes extending the abstract class, (`KafkaAdminClient` and `MockAdminClient`), which will continue to do so to maintain backwards compatibility, and Kafka Stream's `KafkaClientSupplier`, which has a `getAdminClient` method that must continue to return `AdminClient` to maintain binary compatibility. `KafkaClientSupplier` will have a new `Admin getAdmin` method added and the old one deprecated and its internal use switched to the new method.

Code Block
languagejava
titleAdminClient.java
@InterfaceStability.Evolving
public abstract class AdminClient implements Admin {

    /**
     * Create a new AdminClient with the given configuration.
     *
     * @param props The configuration.
     * @return The new KafkaAdminClient.
     */
    public static AdminClient create(Properties props) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), null);
    }

    /**
     * Create a new AdminClient with the given configuration.
     *
     * @param conf The configuration.
     * @return The new KafkaAdminClient.
     */
    public static AdminClient create(Map<String, Object> conf) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null);
    }
}

...

Code Block
languagejava
titleAdmin.java
@InterfaceStability.Evolving
public interface Admin extends AutoCloseable {

    /**
     * Create a new AdminClient with the given configuration.
     *
     * @param props The configuration.
     * @return The new KafkaAdminClient.
     */
    static Admin create(Properties props) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), null);
    }

    /**
     * Create a new AdminClient with the given configuration.
     *
     * @param conf The configuration.
     * @return The new KafkaAdminClient.
     */
    static Admin create(Map<String, Object> conf) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null);
    }

    /**
     * Close the AdminClient and release all associated resources.
     *
     * See {@link Admin#close(long, TimeUnit)}
     */
    @Override
    default void close() {
        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    /**
     * Close the AdminClient and release all associated resources.
     *
     * The close operation has a grace period during which current operations will be allowed to
     * complete, specified by the given duration and time unit.
     * New operations will not be accepted during the grace period.  Once the grace period is over,
     * all operations that have not yet been completed will be aborted with a TimeoutException.
     *
     * @param duration  The duration to use for the wait time.
     * @param unit      The time unit to use for the wait time.
     * @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
     */
    @Deprecated
    default void close(long duration, TimeUnit unit) {
        close(Duration.ofMillis(unit.toMillis(duration)));
    }

    /**
     * Close the Admin client and release all associated resources.
     *
     * The close operation has a grace period during which current operations will be allowed to
     * complete, specified by the given duration.
     * New operations will not be accepted during the grace period.  Once the grace period is over,
     * all operations that have not yet been completed will be aborted with a TimeoutException.
     *
     * @param timeout  The time to use for the wait time.
     */
    void close(Duration timeout);

    /**
     * Create a batch of new topics with the default options.
     *
     * This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.10.1.0 or higher.
     *
     * @param newTopics         The new topics to create.
     * @return                  The CreateTopicsResult.
     */
    default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
        return createTopics(newTopics, new CreateTopicsOptions());
    }

    /**
     * Create a batch of new topics.
     *
     * This operation is not transactional so it may succeed for some topics while fail for others.
     *
     * It may take several seconds after {@code CreateTopicsResult} returns
     * success for all the brokers to become aware that the topics have been created.
     * During this time, {@link Admin#listTopics()} and {@link Admin#describeTopics(Collection)}
     * may not return information about the new topics.
     *
     * This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported
     * from version 0.10.2.0.
     *
     * @param newTopics         The new topics to create.
     * @param options           The options to use when creating the new topics.
     * @return                  The CreateTopicsResult.
     */
    CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options);

    /**
     * This is a convenience method for #{@link Admin#deleteTopics(Collection, DeleteTopicsOptions)}
     * with default options. See the overload for more details.
     *
     * This operation is supported by brokers with version 0.10.1.0 or higher.
     *
     * @param topics            The topic names to delete.
     * @return                  The DeleteTopicsResult.
     */
    default DeleteTopicsResult deleteTopics(Collection<String> topics) {
        return deleteTopics(topics, new DeleteTopicsOptions());
    }

    /**
     * Delete a batch of topics.
     *
     * This operation is not transactional so it may succeed for some topics while fail for others.
     *
     * It may take several seconds after the {@code DeleteTopicsResult} returns
     * success for all the brokers to become aware that the topics are gone.
     * During this time, AdminClient#listTopics and AdminClient#describeTopics
     * may continue to return information about the deleted topics.
     *
     * If delete.topic.enable is false on the brokers, deleteTopics will mark
     * the topics for deletion, but not actually delete them.  The futures will
     * return successfully in this case.
     *
     * This operation is supported by brokers with version 0.10.1.0 or higher.
     *
     * @param topics            The topic names to delete.
     * @param options           The options to use when deleting the topics.
     * @return                  The DeleteTopicsResult.
     */
    DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);

    /**
     * List the topics available in the cluster with the default options.
     *
     * This is a convenience method for #{@link Admin#listTopics(ListTopicsOptions)} with default options.
     * See the overload for more details.
     *
     * @return                  The ListTopicsResult.
     */
    default ListTopicsResult listTopics() {
        return listTopics(new ListTopicsOptions());
    }

    /**
     * List the topics available in the cluster.
     *
     * @param options           The options to use when listing the topics.
     * @return                  The ListTopicsResult.
     */
    ListTopicsResult listTopics(ListTopicsOptions options);

    /**
     * Describe some topics in the cluster, with the default options.
     *
     * This is a convenience method for #{@link Admin#describeTopics(Collection, DescribeTopicsOptions)} with
     * default options. See the overload for more details.
     *
     * @param topicNames        The names of the topics to describe.
     *
     * @return                  The DescribeTopicsResult.
     */
    default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
        return describeTopics(topicNames, new DescribeTopicsOptions());
    }

    /**
     * Describe some topics in the cluster.
     *
     * @param topicNames        The names of the topics to describe.
     * @param options           The options to use when describing the topic.
     *
     * @return                  The DescribeTopicsResult.
     */
    DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options);

    /**
     * Get information about the nodes in the cluster, using the default options.
     *
     * This is a convenience method for #{@link Admin#describeCluster(DescribeClusterOptions)} with default options.
     * See the overload for more details.
     *
     * @return                  The DescribeClusterResult.
     */
    default DescribeClusterResult describeCluster() {
        return describeCluster(new DescribeClusterOptions());
    }

    /**
     * Get information about the nodes in the cluster.
     *
     * @param options           The options to use when getting information about the cluster.
     * @return                  The DescribeClusterResult.
     */
    DescribeClusterResult describeCluster(DescribeClusterOptions options);

    /**
     * This is a convenience method for #{@link Admin#describeAcls(AclBindingFilter, DescribeAclsOptions)} with
     * default options. See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param filter            The filter to use.
     * @return                  The DeleteAclsResult.
     */
    default DescribeAclsResult describeAcls(AclBindingFilter filter) {
        return describeAcls(filter, new DescribeAclsOptions());
    }

    /**
     * Lists access control lists (ACLs) according to the supplied filter.
     *
     * Note: it may take some time for changes made by createAcls or deleteAcls to be reflected
     * in the output of describeAcls.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param filter            The filter to use.
     * @param options           The options to use when listing the ACLs.
     * @return                  The DeleteAclsResult.
     */
    DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);

    /**
     * This is a convenience method for #{@link Admin#createAcls(Collection, CreateAclsOptions)} with
     * default options. See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param acls              The ACLs to create
     * @return                  The CreateAclsResult.
     */
    default CreateAclsResult createAcls(Collection<AclBinding> acls) {
        return createAcls(acls, new CreateAclsOptions());
    }

    /**
     * Creates access control lists (ACLs) which are bound to specific resources.
     *
     * This operation is not transactional so it may succeed for some ACLs while fail for others.
     *
     * If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
     * no changes will be made.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param acls              The ACLs to create
     * @param options           The options to use when creating the ACLs.
     * @return                  The CreateAclsResult.
     */
    CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);

    /**
     * This is a convenience method for #{@link Admin#deleteAcls(Collection, DeleteAclsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param filters           The filters to use.
     * @return                  The DeleteAclsResult.
     */
    default DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
        return deleteAcls(filters, new DeleteAclsOptions());
    }

    /**
     * Deletes access control lists (ACLs) according to the supplied filters.
     *
     * This operation is not transactional so it may succeed for some ACLs while fail for others.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param filters           The filters to use.
     * @param options           The options to use when deleting the ACLs.
     * @return                  The DeleteAclsResult.
     */
    DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);


    /**
     * Get the configuration for the specified resources with the default options.
     *
     * This is a convenience method for #{@link Admin#describeConfigs(Collection, DescribeConfigsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param resources         The resources (topic and broker resource types are currently supported)
     * @return                  The DescribeConfigsResult
     */
    default DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
        return describeConfigs(resources, new DescribeConfigsOptions());
    }

    /**
     * Get the configuration for the specified resources.
     *
     * The returned configuration includes default values and the isDefault() method can be used to distinguish them
     * from user supplied values.
     *
     * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
     * is not disclosed.
     *
     * Config entries where isReadOnly() is true cannot be updated.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param resources         The resources (topic and broker resource types are currently supported)
     * @param options           The options to use when describing configs
     * @return                  The DescribeConfigsResult
     */
    DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options);

    /**
     * Update the configuration for the specified resources with the default options.
     *
     * This is a convenience method for #{@link Admin#alterConfigs(Map, AlterConfigsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param configs         The resources with their configs (topic is the only resource type with configs that can
     *                        be updated currently)
     * @return                The AlterConfigsResult
     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
     */
    @Deprecated
    default AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
        return alterConfigs(configs, new AlterConfigsOptions());
    }

    /**
     * Update the configuration for the specified resources with the default options.
     *
     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
     * a particular resource are updated atomically.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param configs         The resources with their configs (topic is the only resource type with configs that can
     *                        be updated currently)
     * @param options         The options to use when describing configs
     * @return                The AlterConfigsResult
     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}.
     */
    @Deprecated
    AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);

    /**
     * Incrementally updates the configuration for the specified resources with default options.
     *
     * This is a convenience method for #{@link Admin#incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options.
     * See the overload for more details.*
     *
     * This operation is supported by brokers with version 2.3.0 or higher.
     *
     * @param configs         The resources with their configs
     * @return                The IncrementalAlterConfigsResult
     */
    default AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
        return incrementalAlterConfigs(configs, new AlterConfigsOptions());
    }


    /**
     * Incrementally update the configuration for the specified resources.
     *
     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
     * a particular resource are updated atomically.
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
     * the returned {@code IncrementalAlterConfigsResult}:</p>
     * <ul>
     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   if the authenticated user didn't have alter access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException}
     *   if the authenticated user didn't have alter access to the Topic.</li>
     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
     *   if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li>
     * </ul>*
     *
     * This operation is supported by brokers with version 2.3.0 or higher.
     *
     * @param configs         The resources with their configs
     * @param options         The options to use when altering configs
     * @return                The IncrementalAlterConfigsResult
     */
    AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, 
        Collection<AlterConfigOp>> configs, AlterConfigsOptions options);

    /**
     * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
     * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
     * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
     * log directory if it is not already there.
     *
     * This operation is not transactional so it may succeed for some replicas while fail for others.
     *
     * This is a convenience method for #{@link Admin#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 1.1.0 or higher.
     *
     * @param replicaAssignment  The replicas with their log directory absolute path
     * @return                   The AlterReplicaLogDirsResult
     */
    default AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
        return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
    }

    /**
     * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
     * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
     * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
     * log directory if it is not already there.
     *
     * This operation is not transactional so it may succeed for some replicas while fail for others.
     *
     * This operation is supported by brokers with version 1.1.0 or higher.
     *
     * @param replicaAssignment  The replicas with their log directory absolute path
     * @param options            The options to use when changing replica dir
     * @return                   The AlterReplicaLogDirsResult
     */
    AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, 
                                                  AlterReplicaLogDirsOptions options);

    /**
     * Query the information of all log directories on the given set of brokers
     *
     * This is a convenience method for #{@link Admin#describeLogDirs(Collection, DescribeLogDirsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param brokers     A list of brokers
     * @return            The DescribeLogDirsResult
     */
    default DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
        return describeLogDirs(brokers, new DescribeLogDirsOptions());
    }

    /**
     * Query the information of all log directories on the given set of brokers
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param brokers     A list of brokers
     * @param options     The options to use when querying log dir info
     * @return            The DescribeLogDirsResult
     */
    DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);

    /**
     * Query the replica log directory information for the specified replicas.
     *
     * This is a convenience method for #{@link Admin#describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
     * with default options. See the overload for more details.
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param replicas      The replicas to query
     * @return              The DescribeReplicaLogDirsResult
     */
    default DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
        return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
    }

    /**
     * Query the replica log directory information for the specified replicas.
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param replicas      The replicas to query
     * @param options       The options to use when querying replica log dir info
     * @return              The DescribeReplicaLogDirsResult
     */
    DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);

    /**
     * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
     * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
     * the partition logic or ordering of the messages will be affected.</strong></p>
     *
     * <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
     * See the overload for more details.</p>
     *
     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
     *                      for the created partitions.
     * @return              The CreatePartitionsResult.
     */
    default CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
        return createPartitions(newPartitions, new CreatePartitionsOptions());
    }

    /**
     * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
     * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
     * the partition logic or ordering of the messages will be affected.</strong></p>
     *
     * <p>This operation is not transactional so it may succeed for some topics while fail for others.</p>
     *
     * <p>It may take several seconds after this method returns
     * success for all the brokers to become aware that the partitions have been created.
     * During this time, {@link Admin#describeTopics(Collection)}
     * may not return information about the new partitions.</p>
     *
     * <p>This operation is supported by brokers with version 1.0.0 or higher.</p>
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
     * {@link CreatePartitionsResult#values() values()} method of the returned {@code CreatePartitionsResult}</p>
     * <ul>
     *     <li>{@link org.apache.kafka.common.errors.AuthorizationException}
     *     if the authenticated user is not authorized to alter the topic</li>
     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *     if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
     *     <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
     *     if a partition reassignment is currently in progress</li>
     *     <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
     *     if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li>
     *     <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
     *     if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign
     *     replicas with the topics replication factor.</li>
     *     <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
     *     if the request is invalid in some way.</li>
     * </ul>
     *
     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
     *                      for the created partitions.
     * @param options       The options to use when creating the new paritions.
     * @return              The CreatePartitionsResult.
     */
    CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
                                                            CreatePartitionsOptions options);

    /**
     * Delete records whose offset is smaller than the given offset of the corresponding partition.
     *
     * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
     * @return                      The DeleteRecordsResult.
     */
    default DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
        return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
    }

    /**
     * Delete records whose offset is smaller than the given offset of the corresponding partition.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
     * @param options               The options to use when deleting records.
     * @return                      The DeleteRecordsResult.
     */
    DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
                                                      DeleteRecordsOptions options);

    /**
     * <p>Create a Delegation Token.</p>
     *
     * <p>This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
     * See the overload for more details.</p>
     *
     * @return                      The CreateDelegationTokenResult.
     */
    default CreateDelegationTokenResult createDelegationToken() {
        return createDelegationToken(new CreateDelegationTokenOptions());
    }


    /**
     * <p>Create a Delegation Token.</p>
     *
     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
     * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult}</p>
     * <ul>
     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
     *     <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
     *     if the renewers principal type is not supported.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
     *     if the delegation token feature is disabled.</li>
     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *     if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
     * </ul>
     *
     * @param options               The options to use when creating delegation token.
     * @return                      The DeleteRecordsResult.
     */
    CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);


    /**
     * <p>Renew a Delegation Token.</p>
     *
     * <p>This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
     * See the overload for more details.</p>
     *
     *
     * @param hmac                  HMAC of the Delegation token
     * @return                      The RenewDelegationTokenResult.
     */
    default RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
        return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
    }

    /**
     * <p> Renew a Delegation Token.</p>
     *
     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
     * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult}</p>
     * <ul>
     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
     *     if the delegation token feature is disabled.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
     *     if the delegation token is not found on server.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
     *     if the authenticated user is not owner/renewer of the token.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
     *     if the delegation token is expired.</li>
     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *     if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
     * </ul>
     *
     * @param hmac                  HMAC of the Delegation token
     * @param options               The options to use when renewing delegation token.
     * @return                      The RenewDelegationTokenResult.
     */
    RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);

    /**
     * <p>Expire a Delegation Token.</p>
     *
     * <p>This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
     * This will expire the token immediately. See the overload for more details.</p>
     *
     * @param hmac                  HMAC of the Delegation token
     * @return                      The ExpireDelegationTokenResult.
     */
    default ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
        return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
    }

    /**
     * <p>Expire a Delegation Token.</p>
     *
     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
     * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code ExpireDelegationTokenResult}</p>
     * <ul>
     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
     *     if the delegation token feature is disabled.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
     *     if the delegation token is not found on server.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
     *     if the authenticated user is not owner/renewer of the requested token.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
     *     if the delegation token is expired.</li>
     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *     if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
     * </ul>
     *
     * @param hmac                  HMAC of the Delegation token
     * @param options               The options to use when expiring delegation token.
     * @return                      The ExpireDelegationTokenResult.
     */
    ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);

    /**
     *<p>Describe the Delegation Tokens.</p>
     *
     * <p>This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
     * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.</p>
     *
     * @return                      The DescribeDelegationTokenResult.
     */
    default DescribeDelegationTokenResult describeDelegationToken() {
        return describeDelegationToken(new DescribeDelegationTokenOptions());
    }

    /**
     * <p>Describe the Delegation Tokens.</p>
     *
     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
     * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@code DescribeDelegationTokenResult}</p>
     * <ul>
     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
     *     if the delegation token feature is disabled.</li>
     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *     if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
     * </ul>
     *
     * @param options               The options to use when describing delegation tokens.
     * @return                      The DescribeDelegationTokenResult.
     */
    DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);

    /**
     * Describe some group IDs in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeConsumerGroupResult.
     */
    DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
                                                                        DescribeConsumerGroupsOptions options);

    /**
     * Describe some group IDs in the cluster, with the default options.
     * <p>
     * This is a convenience method for
     * #{@link Admin#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)} with
     * default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeConsumerGroupResult.
     */
    default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
        return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
    }

    /**
     * List the consumer groups available in the cluster.
     *
     * @param options           The options to use when listing the consumer groups.
     * @return The ListGroupsResult.
     */
    ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

    /**
     * List the consumer groups available in the cluster with the default options.
     *
     * This is a convenience method for #{@link Admin#listConsumerGroups(ListConsumerGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListGroupsResult.
     */
    default ListConsumerGroupsResult listConsumerGroups() {
        return listConsumerGroups(new ListConsumerGroupsOptions());
    }

    /**
     * List the consumer group offsets available in the cluster.
     *
     * @param options           The options to use when listing the consumer group offsets.
     * @return The ListGroupOffsetsResult
     */
    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);

    /**
     * List the consumer group offsets available in the cluster with the default options.
     *
     * This is a convenience method for #{@link Admin#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
     *
     * @return The ListGroupOffsetsResult.
     */
    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
        return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
    }

    /**
     * Delete consumer groups from the cluster.
     *
     * @param options           The options to use when deleting a consumer group.
     * @return The DeletConsumerGroupResult.
     */
    DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);

    /**
     * Delete consumer groups from the cluster with the default options.
     *
     * @return The DeleteConsumerGroupResult.
     */
    default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
        return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
    }

    /**
     * Elect the preferred replica as leader for topic partitions.
     *
     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
     * with preferred election type and default options.
     *
     * This operation is supported by brokers with version 2.2.0 or higher.
     *
     * @param partitions      The partitions for which the preferred leader should be elected.
     * @return                The ElectPreferredLeadersResult.
     * @deprecated            Since 2.4.0. Use {@link #electLeaders(ElectionType, Set)}.
     */
    @Deprecated
    default ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) {
        return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions());
    }

    /**
     * Elect the preferred replica as leader for topic partitions.
     *
     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
     * with preferred election type.
     *
     * This operation is supported by brokers with version 2.2.0 or higher.
     *
     * @param partitions      The partitions for which the preferred leader should be elected.
     * @param options         The options to use when electing the preferred leaders.
     * @return                The ElectPreferredLeadersResult.
     * @deprecated            Since 2.4.0. Use {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}.
     */
    @Deprecated
    default ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
                                                             ElectPreferredLeadersOptions options) {
        final ElectLeadersOptions newOptions = new ElectLeadersOptions();
        newOptions.timeoutMs(options.timeoutMs());
        final Set<TopicPartition> topicPartitions = partitions == null ? null : new HashSet<>(partitions);

        return new ElectPreferredLeadersResult(electLeaders(ElectionType.PREFERRED, topicPartitions, newOptions));
    }

    /**
     * Elect a replica as leader for topic partitions.
     *
     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
     * with default options.
     *
     * @param electionType            The type of election to conduct.
     * @param partitions              The topics and partitions for which to conduct elections.
     * @return                        The ElectLeadersResult.
     */
    default ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions) {
        return electLeaders(electionType, partitions, new ElectLeadersOptions());
    }

    /**
     * Elect a replica as leader for the given {@code partitions}, or for all partitions if the argumentl
     * to {@code partitions} is null.
     *
     * This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * It may take several seconds after this method returns success for all the brokers in the cluster
     * to become aware that the partitions have new leaders. During this time,
     * {@link Admin#describeTopics(Collection)} may not return information about the partitions'
     * new leaders.
     *
     * This operation is supported by brokers with version 2.2.0 or later if preferred eleciton is use;
     * otherwise the brokers most be 2.4.0 or higher.
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on the future obtained
     * from the returned {@code ElectLeadersResult}:</p>
     * <ul>
     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   if the authenticated user didn't have alter access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
     *   if the topic or partition did not exist within the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
     *   if the topic was already queued for deletion.</li>
     *   <li>{@link org.apache.kafka.common.errors.NotControllerException}
     *   if the request was sent to a broker that was not the controller for the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   if the request timed out before the election was complete.</li>
     *   <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
     *   if the preferred leader was not alive or not in the ISR.</li>
     * </ul>
     *
     * @param electionType            The type of election to conduct.
     * @param partitions              The topics and partitions for which to conduct elections.
     * @param options                 The options to use when electing the leaders.
     * @return                        The ElectLeadersResult.
     */
    ElectLeadersResult electLeaders(
            ElectionType electionType,
            Set<TopicPartition> partitions,
            ElectLeadersOptions options);

    /**
     * Get the metrics kept by the adminClient
     */
    Map<MetricName, ? extends Metric> metrics();
}


Code Block
languagejava
titleKafka Stream's KafkaClientSupplier
public interface KafkaClientSupplier {
    /**
     * Create an {@link AdminClient} which is used for internal topic management.
     *
     * @param config Supplied by the {@link java.util.Properties} given to the {@link KafkaStreams}
     * @return an instance of {@link AdminClient}
     * @deprecated Not called by Kafka Streams, which now uses {@link #getAdmin} instead.
     */
    @Deprecated
    default AdminClient getAdminClient(final Map<String, Object> config) {
        throw new UnsupportedOperationException("Direct use of this method is deprecated. " +
            "Implementations of KafkaClientSupplier should implement the getAdmin method instead. " +
            "The method will be removed in a future release.");
    }

    /**
     * Create an {@link Admin} which is used for internal topic management.
     *
     * @param config Supplied by the {@link java.util.Properties} given to the {@link KafkaStreams}
     * @return an instance of {@link Admin}
     */
    @SuppressWarnings("deprecation")
    default Admin getAdmin(final Map<String, Object> config) {
        return getAdminClient(config);
    }

    ... rest of methods remain the same.
}


Proposed Changes

As per section above. 

...