You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Status

Current state"Under Discussion"

Discussion thread: here

JIRA: KAFKA-7740

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, user/client configurations are stored in the Zookeeper. Brokers monitor the changes of these configurations from Zookeeper, and then enforce them in the process of each request. Any changes to the configurations have to talk to Zookeeper directly, which introduces more overhead for users. 

Given that we have released the KafkaAdminClient, which is capable of managing the configurations of brokers and topics for customers, it's desire to also support the configuration for users and clients. In this case, applications can leverage the unified KafkaAdminClient to manage their user/client configurations, instead of the direct dependency on Zookeeper. 

We already have support for the ACL managements in the AdminClient, and it would be straightforward to support the user/client config in the AdminClient as well.

Public Interfaces

Currently we do not have configEntry defined for the user and client entities, so we propose to add them in the following packages:

org.apache.kafka.clients.admin package, ConfigEntry class:

/**
* Source of configuration entries.
*/
public enum ConfigSource {
    DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
DYNAMIC_USER_CONFIG, // dynamic user config that is configured for a specific user. // we will add this entry to represent the user config
DYNAMIC_CLIENT_CONFIG, // dynamic client config that is configured for a specific client // we will add this entry to represent the client config
DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
DEFAULT_CONFIG, // built-in default configuration for configs that have a default value
UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set
}

org.apache.kafka.common.config package, ConfigResource class:

/**
* Type of resource.
*/
public enum Type {
BROKER((byte) 4), USER((byte) 3), TOPIC((byte) 2), CLIENT((byte) 1), UNKNOWN((byte) 0); // we will add USER and CLIENT type.
...
}

org.apache.kafka.common.requests package, DescribeConfigsResponse class:

public enum ConfigSource {
UNKNOWN_CONFIG((byte) 0),
TOPIC_CONFIG((byte) 1),
DYNAMIC_BROKER_CONFIG((byte) 2),
DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
STATIC_BROKER_CONFIG((byte) 4),
DEFAULT_CONFIG((byte) 5),
DYNAMIC_USER_CONFIG((byte) 6), // we will add this entry.
DYNAMIC_CLIENT_CONFIG((byte) 7); // we will add this entry.
}

Proposed Changes

Add the logics to handle the user and client config change requests in the kafka.server package. This will introduce two cases check in the describeConfigs and alterConfigs in the AdminManager implementation.

The communication with ZK is built on top of the existing adminZKClient package. To be specific, the following two APIs will be leveraged to finish the process of user/client config changes.

/**
* Update the config for a client and create a change notification so the change will propagate to other brokers.
* If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
* and <user> configs are not specified.
*
* @param sanitizedClientId: The sanitized clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeClientIdConfig(sanitizedClientId: String, configs: Properties) {
DynamicConfig.Client.validate(configs)
changeEntityConfig(ConfigType.Client, sanitizedClientId, configs)
}

/**
* Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
* User and/or clientId components of the path may be <default>, indicating that the configuration is the default
* value to be applied if a more specific override is not configured.
*
* @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties) {
if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
DynamicConfig.Client.validate(configs)
else
DynamicConfig.User.validate(configs)
changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
}


Compatibility, Deprecation, and Migration Plan

We only add a new way to configure the quotas, there is nothing to migrate.

Rejected Alternatives

N/A

  • No labels