Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-6240

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka brokers are configured using properties (typically provided in a file named server.properties) that cannot be updated without restarting the broker. Broker configuration options can be described using the new AdminClient, but the configuration cannot currently be altered.

There are several use cases where dynamic updates of broker configs will help to avoid time-consuming restart of brokers. For example:

  1. Update short-lived SSL keystores on the broker
  2. Performance tuning based on metrics (e.g. increase network/IO threads)
  3. Add, remove or reconfigure metrics reporters
  4. Update configuration of all topics consistently across the whole cluster (e.g. unclean.leader.election.enable)
  5. Update log cleaner configuration for tuning
  6. Update listener/security configuration

Goals:

  1. Use existing dynamic configuration mechanism to make commonly updated broker configs dynamic so that they can be updated without restarting the broker.
  2. Support dynamic default configs that are picked up by all brokers in the cluster, to enable consistent configuration across all brokers (e.g. unclean.leader.election.enable)
  3. Support individual broker config overrides for all dynamic configs to enable testing on one broker before the changes are picked up by all brokers.
  4. Enable broker configs to be described and altered using AdminClient.
  5. Continue to support simple property file based configuration so that brokers can be easily started up without any additional configuration steps.

In the first phase of the implementation, commonly updated broker properties will be made dynamic based on known use cases. The goal will be to make more configs dynamic later if required.

Public Interfaces

Dynamic configuration

Dynamic configs for the entity type brokers will be added for a limited set of configuration options by extending the existing dynamic replication quota config for brokers. The corresponding static config in server.properties will be continue to be supported to enable quick start of Kafka brokers with a properties file. Where applicable, default cluster-wide configs can be configured to allow consistent values to be used across all brokers (stored in ZK at /config/brokers/<default>). For per-broker configs, broker-specific config overrides can be configured (stored in ZK at /config/brokers/id).

The precedence used for configs will be:

  1. Dynamic broker-specific config from /config/brokers/id
  2. Dynamic cluster-wide default config from /config/brokers/<default>
  3. Static config from server.properties

Individual broker config overrides will be provided for all the dynamic configs to enable testing. For configs that we want to keep consistent across all brokers, kafka-configs.sh and AdminClient will only allow updates of cluster-wide defaults, but it will still be possible to write overrides to ZK for testing. Brokers will handle all configs consistently with the precedence specified above.

Dynamic broker configs will be stored in ZooKeeper in JSON format along with the existing replication quota config for entity type brokers. With the current dynamic configuration implementation, brokers watch configuration update node in ZooKeeper and invoke config handlers when update notifications are received. The config handler for brokers will be updated to handle KafkaConfig changes as well.

New Broker Configuration Option

sasl.jaas.config will be supported for brokers to enable dynamic SASL configuration of brokers. The property will use the same format as clients and may specify one or more login context entries (one for each supported SASL mechanism). The property may be preceded by listener name if multiple listeners are configured to use SASL.

Format: One or more login context entries using the same format JAAS configuration:

<LoginModuleClass> <ControlFlag> *(<OptionName>=<OptionValue>);

ControlFlag = required|requisite|sufficient|optional

Example:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required user_alice=”alice-secret”; \
org.apache.kafka.common.security.scram.ScramLoginModule required;

Protocol Changes

A new option include_synonyms will be added to DescribeConfigsRequest to return all the configs which may be used as the value of the specified config if the config was removed. For example, flush.ms config for a topic will return the broker config log.flush.interval.ms as a synonym if include_synonyms=true. DescribeConfigsResponse will be return all synonyms in the order of precedence. This is particularly useful to obtain the default value that the config will revert to if a config override is removed. As dynamic broker configs are being added at per-broker and cluster-default levels, include_synonyms can be useful to list all the configured values and the precedence used to obtain the currently configured value.

DescribeConfigsRequest version will be bumped up to 1.

DescribeConfigs Request (Version: 1) => [resource [config_name]] include_synonyms
  resource => resource_type resource_name
    resource_type => INT8
    resource_name => STRING
  config_name => STRING
  include_synonyms => BOOLEAN

DescribeConfigs Response (Version: 1) => throttle_time_ms [entities]
  throttle_time_ms => INT32
  entities => error_code error_message resource [configs]
    error_code => INT16
    error_message => STRING
    resource => resource_type resource_name
      resource_type => INT8
      resource_name => STRING
    configs => [config_entry [synonym]]    <= Added [synonym]
    config_entry =>
      config_name => STRING
      config_value => NULLABLE_STRING
      read_only => BOOLEAN
      is_default => BOOLEAN
      is_sensitive => BOOLEAN
    synonym =>                             <= NEW
      config_name => STRING
      config_value => NULLABLE_STRING
      config_source => INT8                <= may be one of (TOPIC|DYNAMIC_BROKER|DYNAMIC_DEFAULT_BROKER|STATIC_BROKER|DEFAULT)

Public Interface Changes

A new interface Reconfigurable will be added to notify reconfigurable objects of configuration changes. For example, metrics reporters that support reconfiguration can implement the interface Reconfigurable to enable reconfiguration without broker restart. The interface will also be implemented by all internal classes which support reconfiguration (e.g. ChannelBuilder)

package org.apache.kafka.common;

import java.util.Map;
import java.util.Set;

/**
 * Interface for reconfigurable classes that support dynamic configuration.
 */
public interface Reconfigurable {

    /**
     * Returns the names of configs that may be reconfigured.
     */
    Set<String> reconfigurableConfigs();

    /**
     * Reconfigures this instance with the given key-value pairs. The provided
     * map contains all configs including any reconfigurable configs that
     * may have changed since the object was initially configured using
     * {@link Configurable#configure(Map)}.
     */
    void reconfigure(Map<String, ?> configs);

}


The classes DescribeConfigsOptions and DescribeConfigsResult used by AdminClient will be updated to include config synonyms in the result.

Tools

kafka-configs.sh will be updated to configure defaults and overrides for dynamic configuration options for the entity type brokers.

Example:

Add/update broker-specific config for broker with broker id 0:

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'ssl.keystore.location=keystore1.jks,ssl.keystore.password=password1' --entity-name 0 --entity-type brokers

Add/update cluster-wide default config:

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'unclean.leader.election.enable=false' --entity-default --entity-type brokers

Delete broker-specific config (the actual config will revert to its default value, see precedence of configs):

bin/kafka-configs.sh --zookeeper localhost --alter --delete-config unclean.leader.election.enable --entity-type brokers --entity-name 0

Securing passwords in ZooKeeper

In the initial implementation, passwords (e.g keystore password) stored in ZooKeeper will be base-64 encoded passwords which are not secure. Broker configuration in ZooKeeper will be protected using ACLs. It is expected that secure deployments of Kafka will also use network segmentation to limit ZooKeeper access. We may want to consider more secure storage for passwords in future, for example by encrypting passwords with a secret key that is not dynamically configurable.

Proposed Changes

SSL keystore

Use case: Support short-lived SSL certificates for increased security

Config scope: Broker (/config/brokers/id)

Config options:

Dynamic update changes:

Threads

Usecase:: Increase or decrease threads based on traffic pattern. This is useful for performance tuning based on metrics at runtime.

Config scope: Default for whole cluster (/configs/brokers/<default>)

Config options:

Dynamic update changes:

Metrics reporters and their custom configs

Use case: Add new metrics reporter or reconfigure existing metrics reporter

Config scope: Default for whole cluster (/configs/brokers/<default>)

Config options:

Dynamic update changes:

Log Cleaner configs

Use case: Add more log cleaner threads or increase buffer size for tuning

Config scope: Default for whole cluster (/configs/brokers/<default>)

Config options:

Dynamic update changes:

Default Topic configs

Use case: Even though topic configs can be set per-topic dynamically, it is convenient to configure properties like unclean.leader.election.enable consistently across the whole cluster.

Config scope: Default for whole cluster (/configs/brokers/<default>)

Config options (and the equivalent topic configs):

Dynamic update changes:

Listeners and Security configs

Use cases:

Config options:

Listener Configs

Common security config

SSL Configs

SASL Configs

 Dynamic update changes:

 Limitations:

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Deprecate static config in server.properties when a config is made dynamic

It is useful to support static configs in server.properties to enable quick start of Kafka without setting up new config options in ZooKeeper. To retain the simple user experience, we will continue to enable starting up Kafka with a properties file.