Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagetext
titlesasl.jaas.config example
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required user_alice=”alice-secret”; \
org.apache.kafka.common.security.scram.ScramLoginModule required;

...

Securing passwords in ZooKeeper

A new option Anchorsynonymssynonymsinclude_synonymsbroker configuration will be added to DescribeConfigsRequest to return all the configs which may be used as the value of the specified config if the config was removed. For example, flush.ms config for a topic will return the broker config log.flush.interval.ms as a synonym if include_synonyms=true. DescribeConfigsResponse will be return all synonyms in the order of precedence. This is particularly useful to obtain the default value that the config will revert to if a config override is removed. As dynamic broker configs are being added at per-broker and cluster-default levels, include_synonyms can be useful to list all the configured values and the precedence used to obtain the currently configured value.

DescribeConfigsRequest version will be bumped up to 1.

Code Block
languagetext
titleDescribeConfigsRequest
DescribeConfigs Request (Version: 1) => [resource [config_name]] include_synonyms
  resource => resource_type resource_name
    resource_type => INT8
    resource_name => STRING
  config_name => STRING
  include_synonyms => BOOLEAN

DescribeConfigs Response (Version: 1) => throttle_time_ms [entities]
  throttle_time_ms => INT32
  entities => error_code error_message resource [configs]
    error_code => INT16
    error_message => STRING
    resource => resource_type resource_name
      resource_type => INT8
      resource_name => STRING
    configs => [config_entry [synonym]]    <= Added [synonym]
    config_entry =>
      config_name => STRING
      config_value => NULLABLE_STRING
      read_only => BOOLEAN
      is_default => BOOLEAN
      is_sensitive => BOOLEAN
    synonym =>                             <= NEW
      config_name => STRING
      config_value => NULLABLE_STRING
      config_source => INT8                <= may be one of (TOPIC|DYNAMIC_BROKER|DYNAMIC_DEFAULT_BROKER|STATIC_BROKER|DEFAULT)

Public Interface Changes

A new interface Reconfigurable will be added to notify reconfigurable objects of configuration changes. For example, metrics reporters that support reconfiguration can implement the interface Reconfigurable to enable reconfiguration without broker restart. The interface will also be implemented by all internal classes which support reconfiguration (e.g. ChannelBuilder)

Code Block
languagejava
titleReconfigurable
package org.apache.kafka.common;

import java.util.Map;
import java.util.Set;

/**
 * Interface for reconfigurable classes that support dynamic configuration.
 */
public interface Reconfigurable {

    /**
     * Returns the names of configs that may be reconfigured.
     */
    Set<String> reconfigurableConfigs();

    /**
     * Validates the provided configuration. The provided map contains
     * all configs including any reconfigurable configs that may be different
     * from the initial configuration.
     */
    boolean validate(Map<String, ?> configs);

   /**
     * Reconfigures this instance with the given key-value pairs. The provided
     * map contains all configs including any reconfigurable configs that
     * may have changed since the object was initially configured using
     * {@link Configurable#configure(Map)}.
     */
    void reconfigure(Map<String, ?> configs);

}

specify a secret key that is used to encrypt passwords stored in ZooKeeper. The SecretKeyFactory and Cipher algorithms as well as the iteration count used will be also be made configurable. The actual algorithms, salt and iteration count will be stored along with the encrypted password to ensure that the password can be decrypted even if the encryption parameters are altered (though these configs are not being made dynamic at the moment).

  • Name: config.secret  Type: Password
  • Name: config.secret.keyfactory.algorithm  Type: String Default: PBKDF2WithHmacSHA512 if available, otherwise PBKDF2WithHmacSHA1 (e.g. Java7)
  • Name: config.secret.cipher.algorithm  Type: String  Default: AES/CBC/PKCS5Padding 
  • Name: config.secret.iterations  Type: IntegerDefault: 2048

The secret will not be dynamically configurable and hence will never be stored in ZooKeeper. All the dynamic password configs are per-broker configs and hence there is no requirement to maintain the same secret across all brokers.

Broker configuration in ZooKeeper will be protected using ACLs and will no longer be world-readable by default. It is expected that secure deployments of Kafka will also use network segmentation to limit ZooKeeper access.

Protocol Changes

A new option

Anchor
synonyms
synonyms
include_synonyms will be added to DescribeConfigsRequest to return all the configs which may be used as the value of the specified config if the config was removed. For example, flush.ms config for a topic will return the broker config log.flush.interval.ms as a synonym if include_synonyms=true. DescribeConfigsResponse will be return all synonyms in the order of precedence. This is particularly useful to obtain the default value that the config will revert to if a config override is removed. As dynamic broker configs are being added at per-broker and cluster-default levels, include_synonyms can be useful to list all the configured values and the precedence used to obtain the currently configured value.

DescribeConfigsRequest version will be bumped up to 1.

Code Block
languagetext
titleDescribeConfigsRequest
DescribeConfigs Request (Version: 1) => [resource [config_name]] include_synonyms
  resource => resource_type resource_name
    resource_type => INT8
    resource_name => STRING
  config_name => STRING
  include_synonyms => BOOLEAN

DescribeConfigs Response (Version: 1) => throttle_time_ms [entities]
  throttle_time_ms => INT32
  entities => error_code error_message resource [configs]
    error_code => INT16
    error_message => STRING
    resource => resource_type resource_name
      resource_type => INT8
      resource_name => STRING
    configs => [config_entry [synonym]]    <= Added [synonym]
    config_entry =>
      config_name => STRING
      config_value => NULLABLE_STRING
      read_only => BOOLEAN
      config_source => INT8                <= Replaced boolean is_default with more generic config_source (see below for values)
      is_sensitive => BOOLEAN
    synonym =>                             <= NEW
      config_name => STRING
      config_value => NULLABLE_STRING
      config_source => INT8                <= may be one of (TOPIC|DYNAMIC_BROKER|DYNAMIC_DEFAULT_BROKER|STATIC_BROKER|DEFAULT)

Public Interface Changes

A new interface Reconfigurable will be added to notify reconfigurable objects of configuration changes. For example, metrics reporters that support reconfiguration can implement the interface Reconfigurable to enable reconfiguration without broker restart. The interface will also be implemented by all internal classes which support reconfiguration (e.g. ChannelBuilder)

Code Block
languagejava
titleReconfigurable
package org.apache.kafka.common;

import java.util.Map;
import java.util.Set;

/**
 * Interface for reconfigurable classes that support dynamic configuration.
 */
public interface Reconfigurable extends Configurable {

    /**
     * Returns the names of configs that may be reconfigured.
     */
    Set<String> reconfigurableConfigs();

    /**
     * Validates the provided configuration. The provided map contains
     * all configs including any reconfigurable configs that may be different
     * from the initial configuration.
     */
    boolean validate(Map<String, ?> configs);

   /**
     * Reconfigures this instance with the given key-value pairs. The provided
     * map contains all configs including any reconfigurable configs that
     * may have changed since the object was initially configured using
     * {@link Configurable#configure(Map)}.
     */
    void reconfigure(Map<String, ?> configs);

}


The classesDescribeConfigsOptions and DescribeConfigsResult used by AdminClient will be updated to include config synonyms in the result.

 


Code Block
titleNew methods in DescribeConfigOptions
/**
 * Return true if synonym configs should be returned in the response.
 */
public boolean includeSynonyms() {
    return includeSynonyms;
}

/**
 * Set to true if synonym configs should be returned in the response.
 */
public DescribeConfigsOptions includeSynonyms(boolean includeSynonyms) {
    this.includeSynonyms = includeSynonyms;
    return this;
}


Code Block
titleNew methods in ConfigEntry
/**
 * Returns all config values that may be used as the value of this config along with their source,
 * in the order of precedence. The list starts with the value returned in this ConfigEntry.
 * The list is empty if synonyms were not requested using {@link DescribeConfigsOptions#includeSynonyms(boolean)}. 
 */
public List<ConfigSynonym> synonyms() {
    return  synonyms;
}

 


Code Block
titlePublic methods of ConfigSynonym
public static class ConfigSynonym {

    /**
     * Returns the name of this configuration.
     */
    public String name() {
        return name;
    }

    /**
     * Returns the value of this configuration, which may be null if the configuration is sensitive.
     */
    public String value() {
        return value;
    }

    /**
     * Returns the source of this configuration.
     */
    public ConfigSource source() {
        return source;
    }
}
Code Block
titleConfigSource enum
public enum ConfigSource {
    TOPIC_CONFIG,                   // dynamic topic config that is configured for a specific topic
    DYNAMIC_BROKER_CONFIG,          // dynamic broker config that is configured for a specific broker
    DYNAMIC_DEFAULT_BROKER_CONFIG,  // dynamic broker config that is configured as default for all brokers in the cluster
    STATIC_BROKER_CONFIG,           // static broker config provided as broker properties at start up (e.g. server.properties file)
    DEFAULT_CONFIG                  // built-in default configuration for configs that have a default value
}

 

 

 The classesDescribeConfigsOptions and DescribeConfigsResult used by AdminClient will be updated to include config synonyms in the result.

Tools

kafka-configs.sh will be updated to configure defaults and overrides for dynamic configuration options for the entity type brokers.

...

bin/kafka-configs.sh --zookeeper localhost --alter --delete-config unclean.leader.election.enable --entity-type brokers --entity-name 0

Securing passwords in ZooKeeper

In the initial implementation, passwords (e.g keystore password) stored in ZooKeeper will be base-64 encoded passwords which are not secure. Broker configuration in ZooKeeper will be protected using ACLs. It is expected that secure deployments of Kafka will also use network segmentation to limit ZooKeeper access. We may want to consider more secure storage for passwords in future, for example by encrypting passwords with a secret key that is not dynamically configurable.--delete-config unclean.leader.election.enable --entity-type brokers --entity-name 0

Proposed Changes

SSL keystore

...