Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under discussion Accepted

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6886

WIP pull request:  https://github.com/apache/kafka/pull/4990/filesReleased: 2.0.0

 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

While this KIP focuses on Kafka Connect, we propose some common public interfaces and classes that could be used by other parts of Kafka, specifically:

  • ConfigProviderConfigChangeCallback, ConfigData, and ConfigContext:  These interfaces could potentially be used by the Broker in conjunction with KIP-226 to overlay configuration properties from a ConfigProvider (such as a VaultConfigProvider) onto existing configuration properties.
  • ConfigTransformer:  This class could potentially be used by other clients (in conjunction with the previous interfaces) to implement variants of KIP-76 and KIP-269.

...

The public interfaces that are not Connect-specific consist of the following:

  • ConfigProviderConfigChangeCallback, ConfigData, and ConfigContext:  These interfaces are used to abstract a provider of configuration properties.
  • ConfigTransformer:  This class is used to provide variable substitution for a configuration value, by looking up variables (or indirect references) from a set of ConfigProvider instances.  It only provides one level of indirection.

The above classes will be in the package org.apache.kafka.common.config, except for ConfigProvider, which will be in the package org.apache.kafka.common.config.provider, along with any implementations of ConfigProvider (which is currently only FileConfigProvider).

Code Block
languagejava
public interface ConfigProvider 
Code Block
languagejava
public interface ConfigProvider extends Configurable, Closeable {
     
    // Configure this class with the initialization parameters
    void configure(Map<String, ?> configs);
 
    // LookupLook up the data at the given path.
    ConfigData get(ConfigContext ctx, String path);

    // LookupLook up the data with the given keys at the given path.
    ConfigData get(ConfigContext ctx, String path, Set<String> keys);
 
    // The ConfigProvider is responsible for making this callback whenever the key changes.
    // Some ConfigProviders may want to have a background thread with a configurable update interval.
    void subscribe(String path, Set<String> keykeys, ConfigurationChangeCallbackConfigChangeCallback callback);

    // Inverse of subscribe
    void unsubscribe(String path, Set<String> key, ConfigChangeCallback callback);
 
    // Remove all subscriptions
    void unsubscribeAll();
 
    // Close all subscriptions and clean up all resources
    void close();
}
 
public interfaceclass ConfigurationChangeCallbackConfigData {
 
    private Long ttl;
   void onChange(Stringprivate keyMap<String, StringString> value)data;
}
 
public interface ConfigContext {
 
public ConfigData(Map<String, String>  // The name of the client
data, Long ttl) {
        String name()this.ttl = ttl;
 
    // Schedule a reload, possibly for secrets rotation
 this.data = data;
   void scheduleConfigReload(long delayMs);
}
 
    public ConfigData(Map<String, classString> ConfigDatadata) {

     private Map<String  this(null, String> metadatadata);
    private Map<String, String> data;
}
 
    public ConfigData(Map<String, String> metadata, Map<String, String> dataLong ttl() {
        this.metadata = metadata;
        this.data = datareturn ttl;
    }
 
    public Map<String, String> metadatadata() {
        return metadatadata;
    }
}
public interface ConfigChangeCallback  public{
 Map<String, String> data() {
    void onChange(String path, ConfigData return data);
    }
} 

 

Also a helper class will be added that can provide variable substitutions using ConfigProvider instances.  Here is an example implementationskeleton.

 

Code Block
/**
 * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
 * transformations.
 */
public class ConfigTransformer {
    private static final Logger log = LoggerFactory.getLogger(ConfigTransformer.class);

    private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");

    private final Map<String, ConfigProvider> configProviders;
    private final Pattern pattern;

    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
        this(configProviders, DEFAULT_PATTERN);
    }

    public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) {
        this.configProviders = configProviders;
        this.pattern = pattern;
    }

    public Map<String, String> transform(ConfigContext ctx, Map<String, String> configs) {
        Map<String, Set<String>> keysByProvider = new HashMap<>();...
        Map<String, Map<String, String>> lookupsByProvider = new HashMap<>();

        // Collect the variables that need transformation
        for (Map.Entry<String, String> config : configs.entrySet()) {
            List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), pattern);
            for (ConfigVariable var : vars) {
                Set<String> keys = keysByProvider.get(var.providerName}
}

 

An implementation of ConfigProvider called FileConfigProvider will be provided that can use secrets from a Properties file.  When using the FileConfigProvider with the variable syntax ${file:path:key}, the path will be the path to the file and the key will be the property key.

Implementations of ConfigProvider, such as FileConfigProvider, that are provided with Apache Kafka will be placed in the package org.apache.kafka.common.config.provider.  This will facilitate frameworks such as Connect that treat instances of ConfigProvider as components that should be loaded in isolation.  Connect will assume that all classes in the package org.apache.kafka.common.config.provider (except ConfigProvider) are such components.

Two existing interfaces that are specific to Connect will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded.

Code Block
languagejava
public interface SinkTaskContext {
    ...
    Map<String, String> configs();
    ...
}
 
public interface SourceTaskContext {
         if (keys == null) {...
    Map<String,                keys = new HashSet<>String> configs();
    ...
}

 

The following configuration properties for Connect will be added.

Config OptionDescriptionExampleDefault
config.providersA comma-separated list of names for providers.config.providers=file,vaultN/A
config.providers.{name}.classThe Java class name for a provider.config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProviderN/A
config.providers.{name}.param.{param-name}A parameter to be passed to the above Java class on initialization.config.providers.file.param.secrets=/run/mysecretsN/A
config.reload.action
One of:
  • "none" - no action when onChange() is called
  • "restart" - schedule a restart when onChange() is called
config.reload.action=restartrestart


Proposed Changes

Currently the configuration for both Connectors and Tasks is stored in a Kafka topic.  The goal is for these stored configurations to only contain indirect references to secrets.  When a Connector or Task is started, the configuration will be read from Kafka and then passed to the specific Connector or Task.  Before the configuration is passed to the Connector or Task, the indirect references need to be resolved. 

The following are required in a design:

  • Ability to specify one or more custom ConfigProviders that will resolve indirect references for configuration values.  The ConfigProviders are plugins and use the same classloading mechanism as other plugins (converters, transformers, etc.).
  • Ability to pass data to initialize a ConfigProvider on construction or instantiation.
  • For indirect references, a special syntax using the dollar sign ($) will be used to indicate when a configuration value is an indirect reference and for which ConfigProvider(s).

The patterns for variable substitutions are of the form ${provider:[path:]key}, where only one level of indirection is followed during substitutions.  The path in the variable is optional.  This means if you have the following:

Code Block
foo=${file:bar}
bar=${file:baz}

and your file contains

Code Block
bar=hello
baz=world

then the result will be

Code Block
foo=hello
bar=world

As a further clarification, if the ConfigProvider provides a value of the form  ${xxx:yyy}, no further interpolation is done to try to find a provider for xxx, for example.  Also, if a provider does not have a value for the corresponding key, the variable will remained unresolved and the final value will still be of the form ${provider:key}.

 

Here is an example use case:

Code Block
languagetext
# Properties specified in the Worker config
config.providers=vault   # can have multiple comma-separated values
config.providers.vault.class=com.org.apache.connect.configs.VaultConfigProvider
config.providers.vault.param.uri=1.2.3.4
config.providers.vault.param.token=/run/secrets/vault-token
 
# Properties specified in the Connector config
mysql.db.password=${vault:vault_path:vault_db_password_key}

In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance.    When resolving the value for "mysql.db.password", the VaultConfigProvider will use the path "vault_path" and the key "vault_db_password_key".  The VaultConfigProvider would use this path and key to look up the corresponding secret.  (VaultConfigProvider is a hypothetical example for illustration purposes only.)

Here is an example of a FileConfigProvider:

Code Block
/**
 * An implementation of {@link ConfigProvider} that represents a Properties file.
 * All property keys and values are stored as cleartext.
 */
public class FileConfigProvider implements ConfigProvider {

    public void configure(Map<String, ?> configs) {
    }

    /**
     * Retrieves the data at the given Properties file.
     *
     * @param path the file where the data resides
     * @return the configuration data
     */
    public ConfigData get(String path) {                keysByProvider.put(var.providerName, keys);
                }
                keys.add(var.valueVariable);
            }
        }

        // Lookup requested variables from the ConfigProviders
        for (Map.Entry<String, Set<String>> entry : keysByProvider.entrySet()) {
            ConfigProvider provider = configProviders.get(entry.getKey());
            ConfigData configData = provider.lookup(ctx, null, new HashSet<>(entry.getValue()));
            Map<String, String> data = configData.data();
            lookupsByProvider.put(entry.getKey(), data);
        }

        // Perform the transformations
        Map<String, String> result = new HashMap<>(configs);
        for (Map.Entry<String, String> config : configs.entrySet()) {
            result.put(config.getKey(), replace(lookupsByProvider, config.getValue(), pattern));
        }
        return result;
    }

    private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) {
        List<ConfigVariable> configVars = new ArrayList<>();
        Matcher matcher = pattern.matcher(value);
        while (matcher.find()) {
            configVars.add(new ConfigVariable(matcher.group(1), matcher.group(2)));
        }
        return configVars;
    }

    private static String replace(Map<String, Map<String, String>> lookupsByProvider, String value, Pattern pattern) {
        Matcher matcher = pattern.matcher(value);
        StringBuilder builder = new StringBuilder();
        int i = 0;
        while (matcher.find()) {
            Map<String, String> map = lookupsByProvider.get(matcher.group(1));
            String replacement = map.get(matcher.group(2));
            builder.append(value, i, matcher.start());
            if (replacement == null)
                builder.append(matcher.group(0));
            else
                builder.append(replacement);
        Map<String, String> data = i = matcher.endnew HashMap<>();
        }
if (path == null || path.isEmpty()) {
  builder.append(value, i, value.length());
        return new builder.toStringConfigData(data);
    }

    private static}
 class ConfigVariable {
     try (Reader reader final String providerName;= reader(path)) {
        final String valueVariable;

  Properties properties = new   ConfigVariable(String providerName, String valueVariable) {Properties();
            this.providerName = providerNameproperties.load(reader);
            Enumeration<Object> this.valueVariablekeys = valueVariableproperties.keys();
        }

        public String toString(while (keys.hasMoreElements()) {
            return "(" + providerName + ":" + valueVariable + ")";
    String key = keys.nextElement().toString();
                String value = properties.getProperty(key);
             }
    }
}

 

Two existing interfaces that are specific to Connect will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).

Code Block
languagejava
public interface SinkTaskContext {
    ...
    default Map<String, String> config() {
        ...
   if (value != null) {
                    data.put(key, value);
                }
            ...
}
 
public interface SourceTaskContext {
    ...
    default Map<String,return String>new configConfigData(data) {;
        ...
} catch (IOException e) }
    ...
}

 

The following configuration properties for Connect will be added.

Config OptionDescriptionExampleDefault
config.providersA comma-separated list of names for providers.config.providers=file,vaultN/A
config.providers.{name}.classThe Java class name for a provider.config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProviderN/A
config.providers.{name}.param.{param-name}A parameter to be passed to the above Java class on initialization.config.providers.file.param.secrets=/run/mysecretsN/A
config.reload.action
One of:
  • "none" - no action when scheduleConfigReload() is called
  • "restart" - schedule a restart when scheduleConfigReload() is called
config.reload.action=restartrestart

...

Currently the configuration for both Connectors and Tasks is stored in a Kafka topic.  The goal is for these stored configurations to only contain indirect references to secrets.  When a Connector or Task is started, the configuration will be read from Kafka and then passed to the specific Connector or Task.  Before the configuration is passed to the Connector or Task, the indirect references need to be resolved. 

The following are required in a design:

  • Ability to specify one or more custom ConfigProviders that will resolve indirect references for configuration values.  The ConfigProviders are plugins and use the same classloading mechanism as other plugins (converters, transformers, etc.).
  • Ability to pass data to initialize a ConfigProvider on construction or instantiation.
  • For indirect references, a special syntax using the dollar sign ($) will be used to indicate when a configuration value is an indirect reference and for which ConfigProvider(s).

The patterns for variable substitutions are of the form ${provider:key}, where only one level of indirection is followed during substitutions.  This means if you have the following:

Code Block
foo=${file:bar}
bar=${file:baz}

and your file contains

Code Block
bar=hello
baz=world

then the result will be

Code Block
foo=hello
bar=world

 

Here is an example use case:

Code Block
languagetext
# Properties specified in the Worker config
config.providers=vault   # can have multiple comma-separated values
config.provider.vault.class=com.org.apache.connect.configs.VaultConfigProvider
config.provider.vault.param.uri=1.2.3.4
config.provider.vault.param.token=/run/secrets/vault-token
 
# Properties specified in the Connector config
mysql.db.password=${vault:vault_db_password_key}

In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance.    When resolving the value for "mysql.db.password", the VaultConfigProvider will use the key "vault_db_password_key".  The VaultConfigProvider would use this key to look up the corresponding secret.  (VaultConfigProvider is a hypothetical example for illustration purposes only.)

Here is an example of a FileConfigProvider:

Code Block
//**
 * An implementation of {@link ConfigProvider} that simply uses a Properties file.
 */
public class FileConfigProvider implements ConfigProvider {

    private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class);

    public final static String FILE_NAME = "filename";

    private Properties properties;

    /**
     * Configure this class with the initialization parameters{
            throw new ConfigException("Could not read properties from file " + path);
        }
    }

    /**
     * Retrieves the data with the given keys at the given Properties file.
     *
     * @param path the file where the data resides
     * @param keys the keys whose values will be retrieved
     * @return the configuration data
     */
    public voidConfigData configureget(Map<StringString path, ?>Set<String> configskeys) {
        String fileNameMap<String, String> data = (String) configs.get(FILE_NAMEnew HashMap<>();
        tryif (FileReaderpath fileReader== =null new FileReader(fileName|| path.isEmpty()) {
            propertiesreturn = new PropertiesConfigData(data);
        }
        try (Reader reader = properties.load(fileReader);
reader(path)) {
            Properties }properties = catchnew Properties(IOException e) {;
            throw new ConfigException("File name " + fileName + " not found for FileConfigProvider");
    properties.load(reader);
            for (String key : keys) {
    }
    }

    /**
    String *value Lookup up the data at the given path.
= properties.getProperty(key);
           */
    public ConfigDataif get(ConfigContextvalue ctx, String path!= null) {
        Map<String, String> data = new HashMap<>();
        Enumeration<Object> keys = properties.keys(data.put(key, value);
        while (keys.hasMoreElements()) {
      }
      String key = keys.nextElement().toString();    }
            Stringreturn value = properties.getProperty(keynew ConfigData(data);
        } catch (IOException e) {
 if (value != null) {
       throw new ConfigException("Could not read properties from file  data.put(key, value);" + path);
        }
    }

    // visible for testing
  }
  protected Reader reader(String path) throws IOException }{
        return new ConfigData(Collections.<String, String>emptyMap(BufferedReader(new InputStreamReader(new FileInputStream(path), dataStandardCharsets.UTF_8));
    }

    /**
public     * Lookup up the data with the given keys at the given path.
     */
    public ConfigData get(ConfigContext ctx, String path, Set<String> keys) {
        Map<String, String> data = new HashMap<>();
        for (String key : keys) {
            String value = properties.getProperty(key);
            if (value != null) {
                data.put(key, value);
            }
        }
        return new ConfigData(Collections.<String, String>emptyMap(), data);
    }
    // The ConfigProvider is responsible for making this callback whenever the key changes.
    // Some ConfigProviders may want to have a background thread with a configurable update interval.
    public void subscribe(String key, ConfigurationChangeCallback callback) {
        throw new UnsupportedOperationException();
    }
 
    // Inverse of subscribe
    public void unsubscribe(String key) {
        throw new UnsupportedOperationException();
    }
 
    public void close() {
    }
}

Usage:

Code Block
config.providers=file
config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProvider
config.providers.file.param.filename=/mydir/my.properties

 

Secret Rotation

Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.   

In general, secret rotation is orthogonal to a particular Connector.  For example, a JDBC password may be stored in a Docker secret or in Vault.  The JDBC connector does not need to know what the method of rotation is.  Also, it is best if the JDBC connector is informed when it should re-obtain secrets rather than wait until a security exception occurs.  So in this case, a push model is warranted.

Other connectors such as the S3 connector are tightly coupled with a particular secret manager, and may wish to handle rotation on their own.  

To handle the different scenarios, the design offers support both a push model and a pull model for obtaining new secrets.  

Different Connect components may have different responsibilities in handling secret rotation:

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call scheduleConfigReload() to inform the Herder.
  • Herder:  The Herder can push information to the Connector indicating that secrets have expired or may expire in the future.  When the Herder receives the scheduleConfigReload() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none.  The methods SinkTaskContext.config() and SourceTaskContext.config()would be used by the Task to reload the config and resolve indirect references again.

Compatibility, Deprecation, and Migration Plan

No changes are required for existing Connectors.  Existing connector configurations with plaintext passwords will not be affected, and only after they are changed to use the variables (aka, indirect references) will the secrets not be stored by Connect. 

void close() {
    }
}


Usage:

Code Block
config.providers=file
config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProvider

 

Secret Rotation

Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.   

In general, secret rotation is orthogonal to a particular Connector.  For example, a JDBC password may be stored in a Docker secret or in Vault.  The JDBC connector does not need to know what the method of rotation is.  Also, it is best if the JDBC connector is informed when it should re-obtain secrets rather than wait until a security exception occurs.  So in this case, a push model is warranted.

Other connectors such as the S3 connector are tightly coupled with a particular secret manager, and may wish to handle rotation on their own.  

To handle the different scenarios, the design offers support both a push model and a pull model for obtaining new secrets.  

Different Connect components may have different responsibilities in handling secret rotation:

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call onChange() to inform the Herder.
  • Herder:  The Herder can push information to the Connector indicating that secrets have expired or may expire in the future.  When the Herder receives the onChange() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none.  The methods SinkTaskContext.config() and SourceTaskContext.config()would be used by the Task to reload the config and resolve indirect references again.


Compatibility, Deprecation, and Migration Plan

No changes are required for existing Connectors.  Existing connector configurations with plaintext passwords will not be affected, and only after they are changed to use the variables (aka, indirect references) will the secrets not be stored by Connect. 

Connectors that use a ConfigProvider and do not want the restart behavior can specify config.reload.action as none.

Assumptions and Limitations

A typical pattern in Connector implementations is for the Task configuration to be a superset of the Connector configuration.   That is, when creating a Task configuration, the Connector will copy the Connector configuration and then perhaps add some additional configuration properties.  However, there is nothing in the Connect framework that enforces that this relationship need hold between the Task configuration and the Connector configuration.

For the implementation of this KIP, we will make the assumption that a variable reference in the property of the Task configuration has the same key as the variable reference in the Connector configuration.  This is because when the Connector configuration is passed to the Connector, all variable references have been resolved, so that when the Connector returns the Task configuration to the Connect framework, the Connect framework can make use of this assumption to  "reverse transform" the resolved value back to the original variable reference before saving the Task configuration.

This implies that when the assumption does not hold, which would be the case if the Connector copies the resolved value to a property with a different key, for example, then the Connect framework will not be able to reverse transform the value to the original variable reference, and the value will appear in plain-text when saved.   This is a limitation that is currently not addressed by this KIPConnectors that use a ConfigProvider and do not want the restart behavior can specify config.reload.action as none.

Rejected Alternatives

The current scope of this proposal is for Connectors only.  However, as mentioned above, it provides classes that can be used by Brokers and Clients for functionality in other KIPs.

...