Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the AdminClient supports a long list of APIs that can be leveraged by users in many different ways. For instance, the AdminClient allows users to manage entities such as topics, partitions, consumer offset, ACLs, replication log directories and so on. In these scenarios, users only need talk to brokers through AdminClient and there is no direct dependency between client applications and Zookeeper any more.

One missing piece in the AdminClient is the configuration of users and clientId entities, which causes some obstacles for users. For instance, there is no way for users to manage their produce/consume client quota through AdminClient. In addition, there is no support for users to manage any other user or clientId related properties through AdminClient in the future. In these scenarios, users have to direct communicate with Zookeeper, which is exactly the anti-AdminClient pattern.

Public Interfaces

The Given that the AdminClient has already defined supported the following configuration management APIs, the configuration of users and clientIds will reuse these two APIs.for brokers and topics, we will leverage the existing APIs to manage the configurations of users and clients as well. Blew are the two APIs will be reused:

public abstract class AdminClient implements AutoCloseable {

...

 public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);

}

Currently we do not have configEntry defined for the user and client entities, so we propose to add them in the following packages:

Proposed Changes

There are two changes needed for this KIP: client side changes and server side changes:

client side:
1.  Package: org.apache.kafka.

...

common.config

USER and CLIENT resources need to be added in the ConfigResource.java interfaceThey represent the targeting entities for the configuration management. 

public final class ConfigResource {

...

/**
* Source of configuration entries.
*/
public enum ConfigSource {
    DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
DYNAMIC_USER_CONFIG, // dynamic user config that is configured for a specific user. // we will add this entry to represent the user config
DYNAMIC_CLIENT_CONFIG, // dynamic client config that is configured for a specific client // we will add this entry to represent the client config
DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
DEFAULT_CONFIG, // built-in default configuration for configs that have a default value
UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set
}

org.apache.kafka.common.config package, ConfigResource class:

/**
* Type of resource.
*/
 public enum Type {
BROKER((byte) 4), USER((byte) 3), TOPIC((byte) 2), CLIENT((byte) 1), UNKNOWN((byte) 0); // we will add USER and CLIENT type.
...
}

}

2. Package: org.apache.kafka

...

.clients.admin

We would like to dynamically config user and client entities.

/**
* Source of configuration entries.
*/
public enum ConfigSource {
 ...

...

public enum ConfigSource {
UNKNOWN_CONFIG((byte) 0),
TOPIC_CONFIG((byte) 1),
DYNAMIC_BROKER_CONFIG((byte) 2),
DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
STATIC_BROKER_CONFIG((byte) 4),
DEFAULT_CONFIG((byte) 5),
 DYNAMIC_USER_CONFIG((byte) 6), // we will add this entry.dynamic user config that is configured for a specific user. 
DYNAMIC_CLIENT_CONFIG((byte) 7); , // wedynamic willclient addconfig this entry.
}

Proposed Changes

Add the logics to handle the user and client config change requests in the kafka.server package. This will introduce two cases check in the describeConfigs and alterConfigs in the AdminManager implementation.

The communication with ZK is built on top of the existing adminZKClient package. To be specific, the following two APIs will be leveraged to finish the process of user/client config changes.

/**
* Update the config that is configured for a client and create a change notification so the change will propagate to other brokers.
* If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
* and <user> configs are not specified.
*
* @param sanitizedClientId: The sanitized clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeClientIdConfig(sanitizedClientId: String, configs: Properties) {
DynamicConfig.Client.validate(configs)
changeEntityConfig(ConfigType.Client, sanitizedClientId, configs)
}
/**
* Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
* User and/or clientId components of the path may be <default>, indicating that the configuration is the default
* value to be applied if a more specific override is not configured.
*
* @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties) {
if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
DynamicConfig.Client.validate(configs)
else
DynamicConfig.User.validate(configs)
changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
}specific client.
...
}

3. Package: org.apache.kafka.common.requests

Same as above section, add the user and client entry in the ConfigSource definition.
public enum ConfigSource {
...
DYNAMIC_USER_CONFIG((byte) 6),
DYNAMIC_CLIENT_CONFIG((byte) 7);
...
}
server side:

Package: kafka.server

In the AdminManagewr class, add the user and client case processing logics in the describeConfigs and alterConfigs handler. Internally, they will call the adminZkClient, in order to read/write configurations from/to Zookeeper. 

Compatibility, Deprecation, and Migration Plan

...