You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Status

Current state"Under Discussion"

Discussion thread: here

JIRA: KAFKA-7740

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the AdminClient supports a long list of APIs that can be leveraged by users in many different ways. For instance, the AdminClient allows users to manage entities such as topics, partitions, consumer offset, ACLs, replication log directories and so on. In these scenarios, users only need talk to brokers through AdminClient and there is no direct dependency between client applications and Zookeeper any more.

One missing piece in the AdminClient is the configuration of users and clientId entities, which causes some obstacles for users. For instance, there is no way for users to manage their produce/consume client quota through AdminClient. In addition, there is no support for users to manage any other user or clientId related properties through AdminClient in the future. In these scenarios, users have to direct communicate with Zookeeper, which is exactly the anti-AdminClient pattern.

Public Interfaces

The AdminClient has already defined the following configuration management APIs, the configuration of users and clientIds will reuse these two APIs.

public abstract class AdminClient implements AutoCloseable {
 public abstract DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options);
 public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);

}




Currently we do not have configEntry defined for the user and client entities, so we propose to add them in the following packages:

org.apache.kafka.clients.admin package, ConfigEntry class:

/**
* Source of configuration entries.
*/
public enum ConfigSource {
    DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
DYNAMIC_USER_CONFIG, // dynamic user config that is configured for a specific user. // we will add this entry to represent the user config
DYNAMIC_CLIENT_CONFIG, // dynamic client config that is configured for a specific client // we will add this entry to represent the client config
DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
DEFAULT_CONFIG, // built-in default configuration for configs that have a default value
UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set
}

org.apache.kafka.common.config package, ConfigResource class:

/**
* Type of resource.
*/
public enum Type {
BROKER((byte) 4), USER((byte) 3), TOPIC((byte) 2), CLIENT((byte) 1), UNKNOWN((byte) 0); // we will add USER and CLIENT type.
...
}

org.apache.kafka.common.requests package, DescribeConfigsResponse class:

public enum ConfigSource {
UNKNOWN_CONFIG((byte) 0),
TOPIC_CONFIG((byte) 1),
DYNAMIC_BROKER_CONFIG((byte) 2),
DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
STATIC_BROKER_CONFIG((byte) 4),
DEFAULT_CONFIG((byte) 5),
DYNAMIC_USER_CONFIG((byte) 6), // we will add this entry.
DYNAMIC_CLIENT_CONFIG((byte) 7); // we will add this entry.
}

Proposed Changes

Add the logics to handle the user and client config change requests in the kafka.server package. This will introduce two cases check in the describeConfigs and alterConfigs in the AdminManager implementation.

The communication with ZK is built on top of the existing adminZKClient package. To be specific, the following two APIs will be leveraged to finish the process of user/client config changes.

/**
* Update the config for a client and create a change notification so the change will propagate to other brokers.
* If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
* and <user> configs are not specified.
*
* @param sanitizedClientId: The sanitized clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeClientIdConfig(sanitizedClientId: String, configs: Properties) {
DynamicConfig.Client.validate(configs)
changeEntityConfig(ConfigType.Client, sanitizedClientId, configs)
}

/**
* Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
* User and/or clientId components of the path may be <default>, indicating that the configuration is the default
* value to be applied if a more specific override is not configured.
*
* @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties) {
if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
DynamicConfig.Client.validate(configs)
else
DynamicConfig.User.validate(configs)
changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
}


Compatibility, Deprecation, and Migration Plan

We only add a new way to configure the quotas, there is nothing to migrate.

Rejected Alternatives

N/A

  • No labels