Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-6240
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka brokers are configured using properties (typically provided in a file named server.properties
) that cannot be updated without restarting the broker. Broker configuration options can be described using the new AdminClient, but the configuration cannot currently be altered.
There are several use cases where dynamic updates of broker configs will help to avoid time-consuming restart of brokers. For example:
- Update short-lived SSL keystores on the broker
- Performance tuning based on metrics (e.g. increase network/IO threads)
- Add, remove or reconfigure metrics reporters
- Update configuration of all topics consistently across the whole cluster (e.g.
unclean.leader.election.enable
) - Update log cleaner configuration for tuning
- Update listener/security configuration
Goals:
- Use existing dynamic configuration mechanism to make commonly updated broker configs dynamic so that they can be updated without restarting the broker.
- Support dynamic default configs that are picked up by all brokers in the cluster, to enable consistent configuration across all brokers (e.g.
)unclean.leader.election.enable
- Support individual broker config overrides for all dynamic configs to enable testing on one broker before the changes are picked up by all brokers.
- Enable broker configs to be described and altered using
AdminClient.
- Continue to support simple property file based configuration so that brokers can be easily started up without any additional configuration steps.
In the first phase of the implementation, commonly updated broker properties will be made dynamic based on known use cases. The goal will be to make more configs dynamic later if required.
Public Interfaces
Dynamic configuration
Dynamic configs for the entity type brokers
will be added for a limited set of configuration options by extending the existing dynamic replication quota config for brokers. The corresponding static config in server.properties
will be continue to be supported to enable quick start of Kafka brokers with a properties file. Where applicable, default cluster-wide configs can be configured to allow consistent values to be used across all brokers (stored in ZK at /config/brokers/<default>
). For per-broker configs, broker-specific config overrides can be configured (stored in ZK at /config/brokers/id).
The precedence used for configs will be:
- Dynamic broker-specific config from
/config/brokers/id
- Dynamic cluster-wide default config from
/config/brokers
/<default>
- Static config from
server.properties
Individual broker config overrides will be provided for all the dynamic configs to enable testing. For configs that we want to keep consistent across all brokers, kafka-configs.sh
and AdminClient
will only allow updates of cluster-wide defaults, but it will still be possible to write overrides to ZK for testing. Brokers will handle all configs consistently with the precedence specified above.
Dynamic broker configs will be stored in ZooKeeper in JSON format along with the existing replication quota config for entity type brokers
. With the current dynamic configuration implementation, brokers watch configuration update node in ZooKeeper and invoke config handlers when update notifications are received. The config handler for brokers
will be updated to handle KafkaConfig
changes as well.
New Broker Configuration Option
sasl.jaas.config will be supported for brokers to enable dynamic SASL configuration of brokers. The property will use the same format as clients and may specify one or more login context entries (one for each supported SASL mechanism). The property may be preceded by listener name if multiple listeners are configured to use SASL.
Format: One or more login context entries using the same format JAAS configuration:
<LoginModuleClass> <ControlFlag> *(<OptionName>=<OptionValue>);
ControlFlag = required|requisite|sufficient|optional
Example:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required user_alice=”alice-secret”; \ org.apache.kafka.common.security.scram.ScramLoginModule required;
Protocol Changes
AlterConfigsResponse will be updated to include actual config used by the broker after an update (with validateOnly
set to true or false). This is particularly useful to obtain the default value that the config will revert to after removing a broker config. AlterConfigsRequest version will be bumped up to 1.
AlterConfigsResponse (Version: 1) => [responses] responses => resource_type resource_name error_code error_message [configs] resource_type => INT8 resource_name => STRING error_code => INT16 error_message => STRING configs => (NEW) config_name => STRING config_value => STRING
Public Interface Changes
A new interface Reconfigurable
will be added to notify reconfigurable objects of configuration changes. For example, metrics reporters that support reconfiguration can implement the interface Reconfigurable
to enable reconfiguration without broker restart. The interface will also be implemented by all internal classes which support reconfiguration (e.g. ChannelBuilder
)
package org.apache.kafka.common; import java.util.Map; import java.util.Set; /** * Interface for reconfigurable classes that support dynamic configuration. */ public interface Reconfigurable { /** * Returns the names of configs that may be reconfigured. */ Set<String> reconfigurableConfigs(); /** * Reconfigures this instance with the given key-value pairs. The provided * map contains all configs including any reconfigurable configs that * may have changed since the object was initially configured using * {@link Configurable#configure(Map)}. */ void reconfigure(Map<String, ?> configs); }
Tools
kafka-configs.sh
will be updated to configure defaults and overrides for dynamic configuration options for the entity type brokers
.
Example:
Add/update broker-specific config for broker with broker id 0:
bin/kafka-configs --zookeeper localhost:2181 --alter --add-config 'ssl.keystore.location=keystore1.jks,ssl.keystore.password=password1' --entity-name 0 --entity-type brokers
Add/update cluster-wide default config:
bin/kafka-configs --zookeeper localhost:2181 --alter --add-config 'unclean.leader.election.enable=false' --entity-default --entity-type brokers
Delete broker-specific config (the actual config will revert to its default value, see precedence of configs):
bin/kafka-configs.sh --zookeeper localhost --alter --delete-config unclean.leader.election.enable --entity-type brokers --entity-name 0
Securing passwords in ZooKeeper
In the initial implementation, passwords (e.g keystore password) stored in ZooKeeper will be base-64 encoded passwords which are not secure. Broker configuration in ZooKeeper will be protected using ACLs. It is expected that secure deployments of Kafka will also use network segmentation to limit ZooKeeper access. We may want to consider more secure storage for passwords in future, for example by encrypting passwords with a secret key that is not dynamically configurable.
Proposed Changes
SSL keystore
Use case: Support short-lived SSL certificates for increased security
Config scope: Broker (/config/brokers/id
)
Config options:
ssl.keystore.type
ssl.keystore.location
ssl.keystore.password
ssl.key.password
Dynamic update changes:
- Keystore will be updated by reconfiguring the channel builder to create a new
SslFactory
. Existing connections will not be affected, new connections will use the new keystore.
Threads
Usecase:: Increase or decrease threads based on traffic pattern. This is useful for performance tuning based on metrics at runtime.
Config scope: Default for whole cluster (/configs/brokers/<default>
)
Config options:
num.network.threads
num.io.threads
num.replica.fetchers
num.recovery.threads.per.data.dir
background.threads
Dynamic update changes:
- Thread pools that don’t require thread affinity will be updated to use resizable thread pools:
num.io.threads
num.recovery.threads.per.data.dir
background.threads
num.network.threads:
To increase threads, existing connections will continue to be processed in their processor threads and new connections will be processed on new threads. Allocation of processors for new connections will take connection count into account (instead of the current round-robin allocation) to ensure that connections are balanced across processors. When thread count is decreased, threads with the smallest number of active connections will be terminated after closing any connections being processed on those threads. New connections will be processed on remaining threads.num.replica.fetchers
: Affinity of partitions to threads will be preserved for ordering.
Metrics reporters and their custom configs
Use case: Add new metrics reporter or reconfigure existing metrics reporter
Config scope: Default for whole cluster (/configs/brokers/<default>
)
Config options:
metric.reporters
Dynamic update changes:
- Addition and removal of metrics reporters will be supported. Since metrics reporters may use custom configs, dynamic configuration options for brokers may specify any custom configuration. AdminClient and kafka-configs.sh will be updated to accept custom configs.
- Metrics reporters that support reconfiguration may implement the interface
Reconfigurable
. If any of the configs specified inReconfigurable#
reconfigurableConfigs
is updated dynamically, the reporter instance is reconfigured usingReconfigurable#
reconfigure(Map).
Log Cleaner configs
Use case: Add more log cleaner threads or increase buffer size for tuning
Config scope: Default for whole cluster (/configs/brokers/<default>
)
Config options:
log.cleaner.threads
log.cleaner.io.max.bytes.per.second
log.cleaner.dedupe.buffer.size
log.cleaner.io.buffer.size
log.cleaner.io.buffer.load.factor
log.cleaner.backoff.ms
Dynamic update changes:
- Config changes will be applied to existing log cleaner threads from the next iteration. Threads may be added or removed. If any of the old log cleaner threads had died due to errors, new threads will be created to replace those too so that the number of threads after an update reflects the configured
log.cleaner.threads.
Default Topic configs
Use case: Even though topic configs can be set per-topic dynamically, it is convenient to configure properties like unclean.leader.election.enable
consistently across the whole cluster.
Config scope: Default for whole cluster (/configs/brokers/<default>
)
Config options (and the equivalent topic configs):
log.segment.bytes (segment.bytes)
log.roll.ms, log.roll.hours (segment.ms)
log.roll.jitter.ms, log.roll.jitter.hours (segment.jitter.ms)
log.index.size.max.bytes (segment.index.bytes)
log.flush.interval.messages (flush.messages)
log.flush.interval.ms (flush.ms)
log.retention.bytes (retention.bytes)
log.retention.ms, log.retention.minutes, log.retention.hours (retention.ms)
log.index.interval.bytes (index.interval.bytes)
log.cleaner.delete.retention.ms (delete.retention.ms)
log.cleaner.min.compaction.lag.ms (min.compaction.lag.ms)
log.cleaner.min.cleanable.ratio (min.cleanable.dirty.ratio)
log.cleanup.policy (cleanup.policy)
log.segment.delete.delay.ms (file.delete.delay.ms)
unclean.leader.election.enable (unclean.leader.election.enable)
min.insync.replicas (min.insync.replicas)
max.message.bytes (max.message.bytes)
compression.type (compression.type)
log.preallocate (preallocate)
log.message.format.version (message.format.version)
log.message.timestamp.type (message.timestamp.type)
log.message.timestamp.difference.max.ms (message.timestamp.difference.max.ms)
Dynamic update changes:
- These properties will be supported as dynamic defaults that are used by all brokers since we don’t want to have different default values on different brokers. Configuration precedence for these properties will be:
- Dynamic topic level (
/configs/topics/topicName
) - Dynamic broker level (
/configs/brokers/id
) - Dynamic default for all brokers in cluster (
/configs/brokers/<default>
) - Static
server.properties
Listeners and Security configs
Use cases:
- Add a new listener, e.g. to add a new security protocol or make inconsistent changes to existing security protocol (e.g change CA for SSL)
- Configure SSL/SASL configs for new security protocol
- Remove an old listener after new listener has been added to all brokers (e.g. after changing CA for SSL)
- Update advertised listeners, e.g. to set IP address after broker has been started since in some environments broker address is known only after broker has started up
Config options:
Listener Configs
listeners
advertised.listeners
listener.security.protocol.map
inter.broker.listener.name
security.inter.broker.protocol
sasl.mechanism.inter.broker.protocol
Common security config
principal.builder.class
SSL Configs
ssl.protocol
ssl.provider
ssl.cipher.suites
ssl.enabled.protocols
ssl.truststore.type
ssl.truststore.location
ssl.truststore.password
ssl.keymanager.algorithm
ssl.trustmanager.algorithm
ssl.endpoint.identification.algorithm
ssl.secure.random.implementation
ssl.client.auth
SASL Configs
sasl.jaas.config
sasl.kerberos.service.name
sasl.kerberos.kinit.cmd
sasl.kerberos.ticket.renew.window.factor
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.min.time.before.relogin
sasl.enabled.mechanisms
sasl.kerberos.principal.to.local.rules
Dynamic update changes:
- SSL configs will be updated by reconfiguring
ChannelBuilder
and creating a newSslFactory
. If SSL is used for inter-broker communication, inconsistent changes (e.g changing CA) should be made by adding a new listener with the new properties. This is true for SASL as well. - SASL configuration updates will be supported using the dynamic JAAS configuration option
sasl.jaas.config
- Updates to
advertised.listeners
will re-register the new listener in ZK. This update will be not allowed for the listener used in inter-broker communication. In addition to this,AdminClient
will not allow updates to the listener that was used to make the alter request.
Limitations:
- Configuration updates will not be allowed for the listener used in inter-broker communication. To update inter-broker configs without restarting the broker, a new listener can be created with new configs and the inter-broker listener can be changed to the new listener (similar to the currently documented security upgrade process).
Compatibility, Deprecation, and Migration Plan
- Broker configuration using a properties object or the properties file
server.properties
will continue to be supported. So users who do not configure dynamic broker options will not be impacted.
Rejected Alternatives
Deprecate static config in server.properties when a config is made dynamic
It is useful to support static configs in server.properties
to enable quick start of Kafka without setting up new config options in ZooKeeper. To retain the simple user experience, we will continue to enable starting up Kafka with a properties file.