Table of Contents |
---|
Status
Current state: "Under Discussion" Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
...
sasl.jaas.config will be supported for brokers to enable dynamic SASL configuration of brokers. The property will use the same format as clients and may specify with one or more login context entries (one for each supported SASL mechanism)login context specified as the config value. The mechanism name corresponding to the config must be specified as prefix in lower-case (e.g. scram-sha-256.sasl.jaas.config)
. If multple mechanisms are enabled on a listener, separate configs must be provided for each enabled mechanism. The property may be preceded by listener name if multiple listeners are configured to use SASL. (e.g listener.name.sasl_ssl.plain.sasl.jaas.config
)
Format: One or more login context entries entry using the same format JAAS configuration:
...
Code Block | ||||
---|---|---|---|---|
| ||||
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required user_alice=”alice-secret”; \ listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required; |
...
Securing passwords in ZooKeeper
A new option Anchor
DescribeConfigsRequest
to return all the configs which may be used as the value of the specified config if the config was removed. For example, flush.ms
config for a topic will return the broker config log.flush.interval.ms
as a synonym if include_synonyms
=
true. DescribeConfigsResponse
will be return all synonyms in the order of precedence. This is particularly useful to obtain the default value that the config will revert to if a config override is removed. As dynamic broker configs are being added at per-broker and cluster-default levels, include_synonyms
can be useful to list all the configured values and the precedence used to obtain the currently configured value.DescribeConfigsRequest
version will be bumped up to 1.
Code Block | ||||
---|---|---|---|---|
| ||||
DescribeConfigs Request (Version: 1) => [resource [config_name]] include_synonyms
resource => resource_type resource_name
resource_type => INT8
resource_name => STRING
config_name => STRING
include_synonyms => BOOLEAN
DescribeConfigs Response (Version: 1) => throttle_time_ms [entities]
throttle_time_ms => INT32
entities => error_code error_message resource [configs]
error_code => INT16
error_message => STRING
resource => resource_type resource_name
resource_type => INT8
resource_name => STRING
configs => [config_entry [synonym]] <= Added [synonym]
config_entry =>
config_name => STRING
config_value => NULLABLE_STRING
read_only => BOOLEAN
is_default => BOOLEAN
is_sensitive => BOOLEAN
synonym => <= NEW
config_name => STRING
config_value => NULLABLE_STRING
config_source => INT8 <= may be one of (TOPIC|DYNAMIC_BROKER|DYNAMIC_DEFAULT_BROKER|STATIC_BROKER|DEFAULT) |
Public Interface Changes
A new interface Reconfigurable
will be added to notify reconfigurable objects of configuration changes. For example, metrics reporters that support reconfiguration can implement the interface Reconfigurable
to enable reconfiguration without broker restart. The interface will also be implemented by all internal classes which support reconfiguration (e.g. ChannelBuilder
)
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common;
import java.util.Map;
import java.util.Set;
/**
* Interface for reconfigurable classes that support dynamic configuration.
*/
public interface Reconfigurable {
/**
* Returns the names of configs that may be reconfigured.
*/
Set<String> reconfigurableConfigs();
/**
* Validates the provided configuration. The provided map contains
* all configs including any reconfigurable configs that may be different
* from the initial configuration.
*/
boolean validate(Map<String, ?> configs);
/**
* Reconfigures this instance with the given key-value pairs. The provided
* map contains all configs including any reconfigurable configs that
* may have changed since the object was initially configured using
* {@link Configurable#configure(Map)}.
*/
void reconfigure(Map<String, ?> configs);
} |
...
Tools
...
broker configuration will be added to specify a secret key that is used to encrypt passwords stored in ZooKeeper. The SecretKeyFactory
and Cipher
algorithms as well as the iteration count used will be also be made configurable. The actual algorithms, salt and iteration count will be stored along with the encrypted password to ensure that the password can be decrypted even if the encryption parameters are altered (though these configs are not being made dynamic at the moment).
- Name:
password.encoder
.secret
Type:Password
- Name:
password.encoder
.old.secret
Type:Password (only used when
password.encoder
.secret
is rotated) - Name:
Type:password.encoder
.keyfactory.algorithmString
Default:PBKDF2WithHmacSHA512
if available, otherwisePBKDF2WithHmacSHA1
(e.g. Java7) - Name:
Type:password.encoder
.cipher.algorithmString
Default:AES/CBC/PKCS5Padding
(AES-256 if available, AES-128 otherwise) - Name:
Type: Integer Default:password.encoder
.key.length256
if supported,128
otherwise - Name:
Default:
Type: Integerpassword.encoder
.iterations4096
The secret will not be dynamically configurable and hence will never be stored in ZooKeeper. All the dynamic password configs are per-broker configs and hence there is no requirement to maintain the same secret across all brokers. To change password.encoder
.secret
, each broker must be restarted with an updated server.properties that contains the new secret in the config password.encoder
.secret
as well as the old secret in the config password.encoder
.old.secret
. The broker will decode all passwords in ZooKeeper using password.encoder
.old.secret
and update the values in ZooKeeper after re-encoding using password.encoder
.secret.
The config
will be used only if the passwords in ZooKeeper are encoded using the old value and will be ignored otherwise.password.encoder
.old.secret
Broker configuration in ZooKeeper will be protected using ACLs and will no longer be world-readable by default. It is expected that secure deployments of Kafka will also use network segmentation to limit ZooKeeper access.
Protocol Changes
A new option
will be added to
include_synonyms Anchor synonyms synonyms DescribeConfigsRequest
to return all the configs which may be used as the value of the specified config if the config was removed. For example, flush.ms
config for a topic will return the broker config log.flush.interval.ms
as a synonym if
include_synonyms
=true. DescribeConfigsResponse
will be return all synonyms in the order of precedence. This is particularly useful to obtain the default value that the config will revert to if a config override is removed. As dynamic broker configs are being added at per-broker and cluster-default levels,
can be useful to list all the configured values and the precedence used to obtain the currently configured value.include_synonyms
DescribeConfigsRequest
version will be bumped up to 1.
Code Block | ||||
---|---|---|---|---|
| ||||
DescribeConfigs Request (Version: 1) => [resource [config_name]] include_synonyms
resource => resource_type resource_name
resource_type => INT8
resource_name => STRING
config_name => STRING
include_synonyms => BOOLEAN
DescribeConfigs Response (Version: 1) => throttle_time_ms [entities]
throttle_time_ms => INT32
entities => error_code error_message resource [configs]
error_code => INT16
error_message => STRING
resource => resource_type resource_name
resource_type => INT8
resource_name => STRING
configs => [config_entry [synonym]] <= Added [synonym]
config_entry =>
config_name => STRING
config_value => NULLABLE_STRING
read_only => BOOLEAN
config_source => INT8 <= Replaced boolean is_default with more generic config_source (see below for values)
is_sensitive => BOOLEAN
synonym => <= NEW
config_name => STRING
config_value => NULLABLE_STRING
config_source => INT8 <= may be one of (TOPIC|DYNAMIC_BROKER|DYNAMIC_DEFAULT_BROKER|STATIC_BROKER|DEFAULT) |
When MetadataRequest
version is increased after 1.1.0 release, a new error code ENDPOINT_NOT_FOUND_ON_LEADER
will be added to notify clients when a listener is available on the broker used to obtain metadata, but not on the leader of a partition. This could be a transient error when listeners are added and will be retried in the same way as LEADER_NOT_AVAILABLE
. Broker will continue to return LEADER_NOT_AVAILABLE
to clients using older version of MetadataRequest
. In 1.1.0, brokers will return LEADER_NOT_AVAILABLE
instead of UNKNOWN_SERVER_ERROR
in older versions.
Public Interface Changes
A new interface Reconfigurable
will be added to notify reconfigurable objects of configuration changes. For example, metrics reporters that support reconfiguration can implement the interface Reconfigurable
to enable reconfiguration without broker restart. The interface will also be implemented by all internal classes which support reconfiguration (e.g. ChannelBuilder
)
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common;
import java.util.Map;
import java.util.Set;
/**
* Interface for reconfigurable classes that support dynamic configuration.
*/
public interface Reconfigurable extends Configurable {
/**
* Returns the names of configs that may be reconfigured.
*/
Set<String> reconfigurableConfigs();
/**
* Validates the provided configuration. The provided map contains
* all configs including any reconfigurable configs that may be different
* from the initial configuration.
*/
boolean validateReconfiguration(Map<String, ?> configs);
/**
* Reconfigures this instance with the given key-value pairs. The provided
* map contains all configs including any reconfigurable configs that
* may have changed since the object was initially configured using
* {@link Configurable#configure(Map)}.
*/
void reconfigure(Map<String, ?> configs);
} |
The classesDescribeConfigsOptions
and
used by DescribeConfigsResult
AdminClient
will be updated to include config synonyms in the result.
Code Block | ||
---|---|---|
| ||
/**
* Return true if synonym configs should be returned in the response.
*/
public boolean includeSynonyms() {
return includeSynonyms;
}
/**
* Set to true if synonym configs should be returned in the response.
*/
public DescribeConfigsOptions includeSynonyms(boolean includeSynonyms) {
this.includeSynonyms = includeSynonyms;
return this;
} |
Code Block | ||
---|---|---|
| ||
/**
* Returns all config values that may be used as the value of this config along with their source,
* in the order of precedence. The list starts with the value returned in this ConfigEntry.
* The list is empty if synonyms were not requested using {@link DescribeConfigsOptions#includeSynonyms(boolean)}.
*/
public List<ConfigSynonym> synonyms() {
return synonyms;
} |
Code Block | ||
---|---|---|
| ||
public static class ConfigSynonym {
/**
* Returns the name of this configuration.
*/
public String name() {
return name;
}
/**
* Returns the value of this configuration, which may be null if the configuration is sensitive.
*/
public String value() {
return value;
}
/**
* Returns the source of this configuration.
*/
public ConfigSource source() {
return source;
}
} |
Code Block | ||
---|---|---|
| ||
public enum ConfigSource {
DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
DEFAULT_CONFIG // built-in default configuration for configs that have a default value
} |
Tools
kafka-configs.sh
will be updated to configure defaults and overrides for dynamic configuration options for the entity type brokers
. This will be done using the new AdminClient that talks to brokers rather than to ZooKeeper so that password encryption and config validation need to be implemented only in the broker. The full migration of ConfigCommand to use the new AdminClient will be done under KIP-248.
Two new options will be added to kafka-configs.sh under this KIP to enable broker config describe and update using the new AdminClient.
- --bootstrap-server <host>:<port>
- --command-config <config-file>
Example:
Add/update broker-specific config for broker with broker id 0:
bin/kafka-configs configs.sh --bootstrap-zookeeper server localhost:2181 9092 --command-config /path/adminclient.props --alter --add-config 'ssl.keystore.location=keystore1.jks,ssl.keystore.password=password1' --entity-name 0 --entity-type brokers
Add/update cluster-wide default config:
bin/kafka-configs --zookeeper localhost:2181 configs.sh --bootstrap-server localhost:9092
--command-config /path/adminclient.props
--alter --add-config 'unclean.leader.election.enable=false' --entity-default --entity-type brokers
Delete broker-specific config (the actual config will revert to its default value, see precedence of configs):
bin/kafka-configs.sh --bootstrap-zookeeper server localhost:9092
alter --delete--
unclean.leader.election.enable command-config
--entityalter -type brokers --entity-name 0/path/adminclient.props
Securing passwords in ZooKeeper
In the initial implementation, passwords (e.g keystore password) stored in ZooKeeper will be base-64 encoded passwords which are not secure. Broker configuration in ZooKeeper will be protected using ACLs. It is expected that secure deployments of Kafka will also use network segmentation to limit ZooKeeper access. We may want to consider more secure storage for passwords in future, for example by encrypting passwords with a secret key that is not dynamically configurable.delete-config unclean.leader.election.enable --entity-type brokers --entity-name 0
Proposed Changes
SSL keystore
...
listeners
advertised.listeners
listener.security.protocol.map
inter.broker.listener.name
security.inter.broker.protocol
sasl.mechanism.inter.broker.protocol
Common security config
principal.builder.class
...
sasl.jaas.config
sasl.kerberos.service.name
sasl.kerberos.kinit.cmd
sasl.kerberos.ticket.renew.window.factor
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.min.time.before.relogin
sasl.enabled.mechanisms
sasl.kerberos.principal.to.local.rules
Dynamic update changes:
sasl.kerberos.min.time.before.relogin
sasl.enabled.mechanisms
sasl.kerberos.principal.to.local.rules
Dynamic update changes:
- SSL configs will be updated by reconfiguring
ChannelBuilder
and creating a newSslFactory
. If SSL is used for inter-broker communication, inconsistent changes (e.g changing CA) should be made by adding a new listener with the new properties. This is true for SASL as well. - SASL configuration updates will be supported using the dynamic JAAS configuration option
sasl.jaas.config
- Updates to
advertised.listeners
will re-register the new listener in ZK. This update will be not allowed for the listener used in inter-broker communication. In addition to this,AdminClient
will not allow updates to the listener that was used to make the alter request. - When changes are made to listeners, additional logic will be required in the controller to broadcast the updated metadata to all brokers.
- All the security configs can be dynamically configured for new listeners. In the initial implementation, only some configs will be dynamically updatable for existing listeners (e.g. SSL keystores). Support for updating other security configs dynamically for existing listeners will be added later
- SSL configs will be updated by reconfiguring
ChannelBuilder
and creating a newSslFactory
. If SSL is used for inter-broker communication, inconsistent changes (e.g changing CA) should be made by adding a new listener with the new properties. This is true for SASL as well. - SASL configuration updates will be supported using the dynamic JAAS configuration option
sasl.jaas.config
- Updates to
advertised.listeners
will re-register the new listener in ZK. This update will be not allowed for the listener used in inter-broker communication. In addition to this,AdminClient
will not allow updates to the listener that was used to make the alter request.
Limitations:
- Configuration updates will not be allowed for the listener used in inter-broker communication. To update This KIP will not allow dynamic updates to inter-broker configs without restarting the broker, a new listener can be created with new configs and the inter-broker listener can be changed to the new listener (similar to the currently documented security upgrade process)security protocol or listener name. Support for changing inter-broker security configuration without a restart will be done in a follow-on KIP along with additional validation to ensure that all brokers have enabled the new config.
Future Work
Message Format and Inter Broker Protocol
...
log.message.format.version
inter.broker.protocol.version
inter.broker.listener.name
security.inter.broker.protocol
sasl.mechanism.inter.broker.protocol
Dynamic update changes:
AdminClient
needs to verify that all brokers in the cluster support the new version before making an update. To enable this, the version of the broker will be added to the JSON registered by each broker during startup at/brokers/ids/id
. To avoid all brokers reading version information from ZooKeeper, the controller should propagate this information inUpdateMetadataRequest.
This requires protocol change. Note also that this approach only allows verification of the version of brokers that are alive at the timeAlterConfigs
is executed.
- Additional validation is required before changing inter-broker listener or security configuration. This should ensure that the client-side and server-side configs match and also that all brokers in the cluster support the new configuration.
Compatibility, Deprecation, and Migration Plan
...