Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  WIP Accepted

Discussion thread: here

JIRA:   KAFKA-2204

Motivation

Currently, all cluster configuration is managed via property files read by the brokers on startup with the exception of topic configuration. There is no In Kafka, there is no general mechanism to change broker entity configuration without having to do doing a rolling restart of the entire cluster. Additionally, there is no way to model configuration per-client i.e. quotas, permissions etc.This Currently, only topic configs can be changed dynamically. This proposal attempts to build a unified mechanism for modeling configuration across various entities like within Kafka i.e. topics, clients and brokers using Zookeeperetc.

Public Interfaces

  • We will add a new tool called ConfigChangeCommand that can manage all config changes in ZK. New methods will be added in AdminUtils to change configuration. This will be similar to the TopicCommand tool already present.
  • AlterConfig and DescribeConfig APIs will be added (after KIP-4 is complete) to alter and view configs
  • There will be new zookeeper paths under "config" but they are not considered to be public interfaces.

Proposed Changes

This proposal will reuse pieces of the TopicQuotaManager. Similar to topic configs, we can have quota managers for Brokers and Clients (producer and consumer). These classes will listen to Zk notifications from a certain path and apply the notifications within the brokers on a per-topic, broker or clientId basis. These approaches can share significant amounts of code amongst themselves. The only thing that is different is the entity that is being modeled and how the config changes are applied.ZNode Structure:remove the TopicConfigManager and introduce a "DynamicConfigManager" to handle config changes for all entities via Zookeeper. Initially, we will have only 2 entities Topic and Clients.

ZNode Structure

Code Block
There shouldwill be 43 paths within config
/config/producersclients/<client_id>
/config/consumers/<client_id>
/config/topics/<topic_name>
/config/brokerschanges/<broker_id>

Internally, the znodes are comma-separated key-value pairs where key represents the configuration property to change.
{"version": x, "config" : {X1= : Y1, X2= : Y2..}}

 

Upon startup, all brokers will load all the configs from zookeeper. In order to receive notifications, it's undesirable to watch every path under the root config /directory. We can model change notifications the same way as they are currently handled for topic based configs. Here is the workflow for changing or adding a config of any type:

  • Create a znode (or modify existing) under the required path with the configs that you want to update. Example, if you want to change quotas for producer "Adi", add a path under /config/producersclients/Adi as shown below.
  • Create a sequential znode under "config/changes/config_change_XX". This will send a notification to all the watchers. The data within the change node should indicate what has changed i.e. topic config + topic name, client config + clientId or broker config + brokerId.
  • The brokers process all the configs for the entity that has changed configs. 

Config Precedence

These configs Configs in Zookeeper will override take precedence over any configuration read from the properties file. We also do not There is currently no plan to move away from file based configuration for service configs.

Modeling Default values

In addition to overrides, we also need a mechanism to model default configuration. For example: say we have default quotas for all clients and we need to selectively override them on a per-client basis. So far, we don't have a way to change the the default values. We can do this by having a special path to store defaults.

Applying Client Config Changes

The first usecase for client configs is to process quota changes per-client. This class will process all notifications and change clientId quotas accordingly. Some changes to the metrics package are also required to allow modification of quotas. Producer and consumer quotas are distinguished by having different properties since it's possible for a producer and consumer to have the same client id.

The client znode will look like this:

Code Block
{"version" : 1
Code Block
The properties in this znode are applicable to all clients, topics and brokers respectively. Changing something here should affect all entities unless overridden explicitly.
 
/config/producers/__default
/config/consumers/__default
/config/topics/__default
/config/brokers/__default
 
Let's extend the example of quotas. Assume that we have a default quota of 5Mbytes per second per producer and we want to change it to 10M for the producer Adi. The default znode will look like this:
/config/producers/__default
{"version": 0, "config" : {quota=5M}

Overridden config:
/config/producers/Adi
{"version": 0, "config"producer_byte_rate" : 1000, "consumer_byte_rate" : {quota=10M2000}}

Broker Configuration

For Broker configs, the key can be the KafkaConfig property that we wish to override. All configs represented here should override the configs from the property file.

Config Change Notification

Currently, the change notification znode only contains the topic name. We need to add more information to distinguish whether this config change is for a topic, client or other config. Version numbering should also be added.

Code Block
The notification data can be:
Code Block
/config/brokers/0
{"version" : 0 1, "entity_type":"topic/client", "configentity_name" : {log.retention.check.interval.ms=100000}

 This KIP provides a mechanism to change any broker config dynamically but identifying which configs can be changed and how the config changes should be applied within the brokers is out of scope. 

ConfigDef

We will need to add a new property to the ConfigKey to indicate if a config is updatable or not. This property can be used to generate documentation so it becomes very easy to discover which properties can be dynamically changed. If a broker property is set in zookeeper and it is not a config marked "isDynamic", that property can simply be ignored. This is similar to configuring Kafka with a property it does not understand.

Code Block
private static class ConfigKey {
    public final String name;
    public final Type type;
    public final String documentation;
    public final Object defaultValue;
    public final Validator validator;
    public final Importance importance;
    public final boolean required;
	public final boolean isDynamic;

    public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required, boolean isDynamic) {
        super();
        this.name = name;
        this.type = type;
        this.defaultValue = defaultValue;
        this.validator = validator;
        this.importance = importance;
        if (this.validator != null)
            this.validator.ensureValid(name, defaultValue);
        this.documentation = documentation;
        this.required = required;
		this.isDynamic = isDynamic;
    }
 }
 
// The isDynamic property can be included in the documentation for all configs
public String toHtmlTable() {
    StringBuilder b = new StringBuilder();
    b.append("<table>\n");
    b.append("<tr>\n");
    b.append("<th>Name</th>\n");
    b.append("<th>Type</th>\n");
    b.append("<th>Default</th>\n");
    b.append("<th>Importance</th>\n");
    b.append("<th>Description</th>\n");
	b.append("<th>IsDynamic</th>\n");
}

Applying Configs within Broker

The ZK listener can have a Properties object parsed from the properties file which was used to start the KafkaServer. Anytime a change notification is received, we should do the following:

  1. Parse received notification into a properties object.
  2. Within KafkaConfig, for each newly changed property, verify that it can be changed dynamically. For this, ConfigDef needs to expose a getConfigKey() method that can be used to check the isDynamic flag.

All configs should always be accessed via a reference to KafkaConfig. For this, all subsystems within the broker need to be configured with a config object and not individual config values.

Code Block
// Sample code only
class BrokerConfigManager {
	// Properties initially configured into the server 

	val kafkaConfig : KafkaConfig = ... ;
	
	.....
	def processConfigChanges(notifications: Seq[String]) {
		val updatedProperties = AdminUtils.fetchConfig(zkClient, configPath)
		kafkaConfig.updateProperties(updatedProperties);
		
	}	
}
 
class AdminUtils {
 def fetchConfig(zkClient: ZkClient, path: String): Properties = {
	val str: String = zkClient.readData(path, true)
	val props = new Properties()
	// Read the znode and return a properties object
	props
}

Config Change Notification

Currently, the change notification znode only contains the topic name. We need to add more information to distinguish whether this config change is for a topic, client or broker config. We need to distinguish producers and consumers separately because it is possible for a producer and consumer to have the same clientId. 

"topic_name/client_id"}

Config API

This proposal also adds broker APIs to alter and view configs for any entity. These requests can be sent to any broker within the cluster. See KIP-4 for more details on the implementation. Once work for KIP-4 completes, we can add these API's as described below. 

AlterConfig

  • AddedConfigEntry is an array of configs that will be added for that entity
  • DeletedConfig is an array of configs that will be deleted for that entity. For deletion on the config property name needs to be specified and not the value. Also, when a config property is deleted, that entity will reapply the default value for that property. For example: if you delete the quota for a clientId, the quota for that client will be set to the default quota.

Code Block
// EntityType can be either topic or client. AddedConfigEntry and DeletedConfig will be an array.
AlterConfigRequest => [EntityType EntityName [AddedConfigEntry] [DeletedConfig]]
    EntityType => string
    EntityName => string
    AddedConfigEntry => [ConfigKey ConfigValue]
        ConfigKey => string
        ConfigValue => string
    DeletedConfig => string
 
AlterConfigResponse => [EntityType EntityName ErrorCode]
	EntityType => string
  	EntityName => string
	ErrorCode => int16

DescribeConfig

The DescribeConfig request is used to query configs for entities.

Code Block
DescribeConfigRequest => [EntityType EntityName]
    EntityType => string
	EntityName => string
 
// ConfigEntry is an array. It will be empty if there is an error. ErrorCode will be non-zero in case of error
DescribeConfigResponse => [EntityType EntityName ConfigEntry]
    EntityType => string
    EntityName => string
	ErrorCode => int16
    ConfigEntry => [ConfigKey ConfigValue]
        ConfigKey => string
        ConfigValue => string

Error Codes

We will add these new error codes to the protocol.

Error

Description

Requests
InvalidEntityConfigIf the config key or value used in --alter-config and --delete-config is incorrect, InvalidConfig is returnedAlter
InvalidEntity Either the entityType or entityName is incorrect. Entity Type must be (topics/clients)Alter, Describe

CLI and Interactive Shell

As described in KIP-4, these commands will have scripts and an interactive shell.

Code Block
# Topic Commands - options are ported from TopicCommand.scala
bin/kafka.sh --alter-config --entity-type [topic/client] --entity-name name --added-config key=value,key=value --deleted-config key,key,key --broker-list <host : port>
bin/kafka.sh --describe-config --entity-type [topic/client] --entity-name name --broker-list <host : port>
Code Block
The notification data can be:
{"entity":"topic/producer/consumer/broker", "value" : "topic_name/client_id/broker_id"}

Compatibility, Deprecation, and Migration Plan

  • TopicConfigManager has a config_change_XX sequential znode under configs/. The format of data within the config_change node is going to change hence it is important to not make any config changes using AdminTools TopicCommand until the cluster is fully upgraded. 
  • We will eventually deprecate the tooling that changes entity configs by modifying znodes directly. All requests should be sent to one of the brokers after KIP-4 is complete.

Migration plan for notifications

Since the format of notifications is going to change, the new code will no longer be able to read topic change notifications written in the older format. The purge interval for notifications is 15 minutes, so any notification older than 15 minutes should be ignored regardless of the content in the znode. In order to not lose any notifications, cluster administrators should not allow any config changes for at least 15 minutes prior to upgrading the cluster. Upon startup, the brokers will parse all notifications and purge the old ones. 
After the upgrade is complete, config changes cannot be done using tooling from older releases of Kafka.

If a rollback must be done after config changes have been made using the new format, the config changes must be purged from zookeeper prior to the rollback (since the old code will throw an exception if it reads a notification in the new format).

Rejected Alternatives

Dynamic Service Configs: After plenty of discussion, we've decided to not allow broker(service) configuration be dynamic. There were a couple of approaches:

  • Service Config in ZK
  • Reload config file via SIGHUP

See the attached discussion thread for more details on this decision.