Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  "Under Discussion" Rejected in favor of KIP-546

Discussion thread: TBD here

JIRA: KAFKA-7740

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, user/client configurations are stored in the Zookeeper. Brokers monitor the changes of these configurations from Zookeeper, and then enforce them in the process of each request. Any changes to the configurations have to talk to Zookeeper directly, which introduces more overhead for users. 

Given that we have released the KafkaAdminClient, which is capable of managing the configurations of brokers and topics for customers, it's desire to also support the configuration for users and clients. In this case, applications can leverage the unified KafkaAdminClient to manage their user/client configurations, instead of the direct dependency on Zookeeper. 

We already have support for the ACL managements in the AdminClient, and it would be straightforward to support the user/client config in the AdminClient as well.

Public Interfaces

Currently we do not have configEntry defined for the user and client entities, so we propose to add them in the following packages:

org.apache.kafka.clients.admin package, ConfigEntry class:

/**
* Source of configuration entries.
*/
public enum ConfigSource {
    DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
DYNAMIC_USER_CONFIG, // dynamic user config that is configured for a specific user. // we will add this entry to represent the user config
DYNAMIC_CLIENT_CONFIG, // dynamic client config that is configured for a specific client // we will add this entry to represent the client config
DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
DEFAULT_CONFIG, // built-in default configuration for configs that have a default value
UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set
}

org.apache.kafka.common.config package, ConfigResource class:

/**
* Type of resource.
*/
public enum Type {
BROKER((byte) 4), USER((byte) 3), TOPIC((byte) 2), CLIENT((byte) 1), UNKNOWN((byte) 0); // we will add USER and CLIENT type.
...
}

org.apache.kafka.common.requests package, DescribeConfigsResponse class:

public enum ConfigSource {
UNKNOWN_CONFIG((byte) 0),
TOPIC_CONFIG((byte) 1),
DYNAMIC_BROKER_CONFIG((byte) 2),
DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
STATIC_BROKER_CONFIG((byte) 4),
DEFAULT_CONFIG((byte) 5),
DYNAMIC_USER_CONFIG((byte) 6), // we will add this entry.
DYNAMIC_CLIENT_CONFIG((byte) 7); // we will add this entry.
}

Proposed Changes

Add the logics to handle the user and client config change requests in the kafka.server package. This will introduce two cases check in the describeConfigs and alterConfigs in the AdminManager implementation.

The communication with ZK is built on top of the existing adminZKClient package. To be specific, the following two APIs will be leveraged to finish the process of user/client config changes.

...

the AdminClient supports a long list of APIs that can be leveraged by users in many different ways. For instance, the AdminClient allows users to manage entities such as topics, partitions, consumer offset, ACLs, replication log directories and so on. In these scenarios, users only need talk to brokers through AdminClient and there is no direct dependency between client applications and Zookeeper any more.

One missing piece in the AdminClient is the configuration of client quotas. Without this feature, users have to directly talk to Zookeeper in order to manage the quota configurations. This KIP is designed to bring this feature into the AdminClient library.

Public Interfaces

1.The following quota related APIs will be added to the AdminClient from package org.apache.kafka.clients.admin.

/**
* This is a convenient method for #{@link AdminClient#setQuotas(Map, SetQuotasOptions)} with default options.
*
* This operation is not transactional so it may succeed for some quotas while fail for others.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param quotas The quotas to use.
* @return The SetQuotaResults.
*/
public SetQuotasResults setQuotas(Map<QuotaTarget, QuotaLimit> quotas) {
return setQuotas(quotas, new SetQuotasOptions());
}

/**
* Set quotas for the specified Quota Targets.
*
* This operation is not transactional so it may succeed for some quotas while fail for others.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param quotas The quotas to use.
* @param options The options to use when set the quotas.
* @return The SetQuotaResults.
*/
public abstract SetQuotasResults setQuotas(Map<QuotaTarget, QuotaLimit> quotas, SetQuotasOptions options);

/**
* This is a convenient method for #{@link AdminClient#describeQuota(Map, DescribeQuotasOptions)} with default options.
* Get the quota settings for the specific Quota Target.
*
* @param target The quota target to use.
* @return The DescribeQuotasResults.
*/
public DescribeQuotasResults describeQuota(QuotaTarget target) {
return describeQuota(target, new DescribeQuotasOptions());
}

/**
* Get the quota settings for the specific Quota Target.
*
* @param target The quota target to use.
* @param options The options to use when describe the quotas.
* @return The DescribeQuotasResults.
*/
public abstract DescribeQuotasResults describeQuota(QuotaTarget target, DescribeQuotasOptions options);

/**
* Delete the quota setting for the specified quota target.
* @param quotaTargets The quota target to use.
* @param options The options to use when delete the quota.
* @return The DeleteQuotasResults
*/
public abstract DeleteQuotasResults deleteQuotas(Collection<QuotaTarget> quotaTargets, DeleteQuotasOptions options);

/**
* Delete the quota setting for the specified quota target.
* @param quotaTargets The quota target to use.
* @return The DeleteQuotasResults
*/
public DeleteQuotasResults deleteQuotas(Collection<QuotaTarget> quotaTargets) {
return deleteQuotas(quotaTargets, new DeleteQuotasOptions());
}

2. Add the QuotaTarget interface to represent the identities of quota targets, it is implemented by ClientQuotaTarget, UserQuotaTarget and UerAndClientIdQuotaTarget types. 

package org.apache.kafka.server.quota;

public interface QuotaTarget {
/**
* Return the quota target type.
*/
QuotaTargetType getQuotaTargetType();
}

3. Add the QuotaLimit objects to represent different quota configurations, e.g. produce_rate = 10000 bytes/s

package org.apache.kafka.server.quota;

public class QuotaLimit {
private long quotaValue;
private ClientQuotaType quotaType;

public QuotaLimit(ClientQuotaType quotaType, long quotaValue) {
this.quotaType = quotaType;
this.quotaValue = quotaValue;
}

/**
* Get the quota value for this Quota limit.
*
* @return For ClientQuotaType.PRODUCE and ClientQuotaType.FETCH, return the quota value in unit of bytesPerSec.
* For ClientQuotaType.REQUEST, return the percentage value.
*/
public long getQuotaValue() {
return quotaValue;
}

/**
* Get the quota type.
* @return the ClientQuotaType.
*/
public ClientQuotaType getQuotaType() {
return quotaType;
}
}
Proposed Changes

The existing protocols will be used to send requests from AdminClient to any broker for the quota management, because fundamentally the quota management is one category of the general configuration changes

To be specific, the following request/response pairs will be leveraged:

  1. IncrementalAlterConfigsRequest and IncrementalAlterConfigsResponse
  2. DescribeConfigsRequest and DescribeConfigsResponse

The AdminClient will convert the quota management requests from the client side to the Configuration alter/describe requests, before sending them to Kafka brokers.

USER and CLIENT resources need to be added in the ConfigResource.java interfaceThey represent the targeting entities for the quota management. 

public final class ConfigResource {
 public enum Type {
BROKER((byte) 4), USER((byte) 3), TOPIC((byte) 2), CLIENT((byte) 1), UNKNOWN((byte) 0); // we add USER and CLIENT type.
...
}

}

Also we would like to dynamically config user and client entities.

/**
* Source of configuration entries.
*/
public enum ConfigSource {
 ...
 DYNAMIC_USER_CONFIG, // dynamic user config that is configured for a specific user. 
DYNAMIC_CLIENT_CONFIG, // dynamic client config that is configured for a specific client.
...
}

In the KafkaApi class, add the user and client configuration processing logics in the handleIncrementalAlterConfigsRequest and handleDescribeConfigsRequest. 

Internally, they will call the adminZkClient, in order to read/write configurations from/to Zookeeper, and then return the results to users. 

The ACL for the quota management is in the Cluster level. 

Compatibility, Deprecation, and Migration Plan

There is no known compatibility issue introduced in this KIP.

There is also no feature to deprecate from this KIP.

No need to migration, as data are always in the Zookeeper. 

Rejected Alternatives

Manage the quotas through existing configuration management APIs.

  1. Expose the lower level quota implementation details to users.
  2. Inconvenient for users to manage the quota. Users have to construct the right property name for each quota types, and it's error prone.
  3. Hard to extend in the future.

Compatibility, Deprecation, and Migration Plan

We only add a new way to configure the quotas, there is nothing to migrate.

Rejected Alternatives

...