Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Connect should not store or transmit cleartext passwords in connector configurations. TLS can be enabled on Connect's REST API, and this proposal addresses how Connect deals with secrets in stored connector configurations by integrating with external secret management systems. First, since no single standard exists, Connect will provide an extension point for adding customized integrations and will provide a simple file-based extension as an example. Second, a Connect runtime can be configured to use one or more of these extensions, and will allow connector configurations to use placeholders that will be resolved by the runtime before passing the complete connector configurations to connectors. Therefore, existing connectors will not see any difference in the configurations that Connect provides to them at startup. And third, Connect's API will be changed to allow a connector to obtain the latest connector configuration at any time.

Public Interfaces

Two new interfaces will be added to Kafka Connect's public API to allow custom implementations to integrate with external systems for managing secrets.

Relationship to Kafka Brokers and Clients

While this KIP focuses on Kafka Connect, we propose some common public interfaces and classes that could be used by other parts of Kafka, specifically:

  • ConfigProviderConfigData, and ConfigContext:  These interfaces could potentially be used by the Broker in conjunction with KIP-226 to overlay configuration properties from a ConfigProvider (such as a VaultConfigProvider) onto existing configuration properties.
  • ConfigTransformer:  This class could potentially be used by other clients (in conjunction with the previous interfaces) to implement variants of KIP-76 and KIP-269.

Public Interfaces

The public interfaces that are not Connect-specific consist of the following:

  • ConfigProviderConfigData, and ConfigContext:  These interfaces are used to abstract a provider of configuration properties.
  • ConfigTransformer:  This class is used to provide variable substitution for a configuration value, by looking up variables (or indirect references) from a set of ConfigProvider instances.

Code Block
languagejava
public interface ConfigProvider extends Configurable, Closeable {
    
Code Block
languagejava
public interface ConfigProvider extends Configurable, Closeable {
     
    // Configure this class with the initialization parameters
    void configure(Map<String, ?> configs);
 
    // Transform the configs by resolving all indirect references
    Map<String, String> transform(ConfigContext ctx, Map<String, String> configs);
}
 
public interface ConfigContext {
 
    // Configure Thethis nameclass ofwith the initialization connectorparameters
    Stringvoid connectorName(configure(Map<String, ?> configs);
 
    // Lookup Scheduleup athe reload,data possiblyat forthe secretsgiven rotationpath.
    voidConfigData scheduleConfigReload(long delayMs);
}

 

Two existing interfaces will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).

Code Block
languagejava
public interface SinkTaskContext {
    ...
    default Map<String, String> config() {lookup(ConfigContext ctx, String path);

    // Lookup up the data with the given keys at the given path.
    ConfigData lookup(ConfigContext ctx,  ...
    }
    ...
}
 String path, Set<String> keys);
}
 
public interface SourceTaskContextConfigContext {
 
   ...
 // The  default Map<String, String> config() {
        ...
    }
    ...
}

 

The following configuration properties will be added.

Config OptionDescriptionExampleDefault
config.providersA comma-separated list of names for providers.config.providers=file,vaultN/A
config.providers.{name}.classThe Java class name for a provider.config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProviderN/A
config.providers.{name}.param.{param-name}A parameter to be passed to the above Java class on initialization.config.providers.file.param.secrets=/run/mysecretsN/A
config.reload.action
One of:
  • "none" - no action when scheduleConfigReload() is called
  • "restart" - schedule a restart when scheduleConfigReload() is called
config.reload.action=restartrestart

...

Currently the configuration for both Connectors and Tasks is stored in a Kafka topic.  The goal is for these stored configurations to only contain indirect references to secrets.  When a Connector or Task is started, the configuration will be read from Kafka and then passed to the specific Connector or Task.  Before the configuration is passed to the Connector or Task, the indirect references need to be resolved. 

The following are required in a design:

  • Ability to specify one or more custom ConfigProviders that will resolve indirect references for configuration values.  The ConfigProviders are plugins and use the same classloading mechanism as other plugins (converters, transformers, etc.).
  • Ability to pass data to initialize a ConfigProvider on construction or instantiation.
  • For indirect references, a special syntax using the dollar sign ($) will be used to indicate when a configuration value is an indirect reference and for which ConfigProvider(s).

Example:

Code Block
languagetext
# Properties specified in the Worker config
config.providers=vault   # can have multiple comma-separated values
config.provider.vault.class=com.org.apache.connect.configs.VaultConfigProvider
config.provider.vault.param.uri=1.2.3.4
config.provider.vault.param.token=/run/secrets/vault-token
 
# Properties specified in the Connector config
mysql.db.password=${vault:vault_db_password_key}

In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance.    When resolving the value for "mysql.db.password", the VaultConfigProvider will use the key "vault_db_password_key".  The VaultConfigProvider would use this key to look up the corresponding secret.  (VaultConfigProvider is a hypothetical example for illustration purposes only.)

Here is an example of a FileConfigProvider:

name of the client
    String name();
 
    // Schedule a reload, possibly for secrets rotation
    void scheduleConfigReload(long delayMs);
}
 
public class ConfigData {

    private Map<String, String> metadata;
    private Map<String, String> data;

    public ConfigData(Map<String, String> metadata, Map<String, String> data) {
        this.metadata = metadata;
        this.data = data;
    }

    public Map<String, String> metadata() {
        return metadata;
    }

    public Map<String, String> data() {
        return data;
    }
}

 

Also a helper class will be added that can provide variable substitutions using ConfigProvider instances.  Here is an example implementation:

Code Block
/**
 * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
 * transformations.
 */
public class ConfigTransformer {
    private static final Logger log = LoggerFactory.getLogger(ConfigTransformer.class);

    private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):(.*?)\\}");

    private final Map<String, ConfigProvider> configProviders;
    private final Pattern pattern;

    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
        this(configProviders, DEFAULT_PATTERN);
    }

    public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) {
        this.configProviders = configProviders;
        this.pattern = pattern;
    }

    public Map<String, String> transform(ConfigContext ctx, Map<String, String> configs) {
        Map<String, Set<String>> keysByProvider = new HashMap<>();
        Map<String, Map<String, String>> lookupsByProvider = new HashMap<>();

        // Collect the variables that need transformation
        for (Map.Entry<String, String> config : configs.entrySet()) {
            List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), pattern);
            for (ConfigVariable var : vars) {
                Set<String> keys = keysByProvider.get(var.providerName);
                if (keys == null) {
                    keys = new HashSet<>();
                    keysByProvider.put(var.providerName, keys);
                }
                keys.add(var.valueVariable);
            }
        }

        // Lookup requested variables from the ConfigProviders
        for (Map.Entry<String, Set<String>> entry : keysByProvider.entrySet()) {
            ConfigProvider provider = configProviders.get(entry.getKey());
            ConfigData configData = provider.lookup(ctx, null, new HashSet<>(entry.getValue()));
            Map<String, String> data = configData.data();
            lookupsByProvider.put(entry.getKey(), data);
        }

        // Perform the transformations
        Map<String, String> result = new HashMap<>(configs);
        for (Map.Entry<String, String> config : configs.entrySet()) {
            result.put(config.getKey(), replace(lookupsByProvider, config.getValue(), pattern));
        }
        return result;
    }

    private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) {
        List<ConfigVariable> configVars = new ArrayList<>();
        Matcher matcher = pattern.matcher(value);
        while (matcher.find()) {
            configVars.add(new ConfigVariable(matcher.group(1), matcher.group(2)));
        }
        return configVars;
    }

    private static String replace(Map<String, Map<String, String>> lookupsByProvider, String value, Pattern pattern) {
        Matcher matcher = pattern.matcher(value);
        StringBuilder builder = new StringBuilder();
        int i = 0;
        while (matcher.find()) {
            Map<String, String> map = lookupsByProvider.get(matcher.group(1));
            String replacement = map.get(matcher.group(2));
            builder.append(value, i, matcher.start());
            if (replacement == null)
                builder.append(matcher.group(0));
            else
                builder.append(replacement);
            i = matcher.end();
        }
        builder.append(value, i, value.length());
        return builder.toString();
    }

    private static class ConfigVariable {
        final String providerName;
        final String valueVariable;

        ConfigVariable(String providerName, String valueVariable) {
            this.providerName = providerName;
            this.valueVariable = valueVariable;
        }

        public String toString() {
            return "(" + providerName + ":" + valueVariable + ")";
        }
    }
}

 

Two existing interfaces that are specific to Connect will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).

Code Block
languagejava
public interface SinkTaskContext {
    ...
    default Map<String, String> config() {
        ...
    }
    ...
}
 
public interface SourceTaskContext {
    ...
    default Map<String, String> config() {
        ...
    }
    ...
}

 

The following configuration properties for Connect will be added.

Config OptionDescriptionExampleDefault
config.providersA comma-separated list of names for providers.config.providers=file,vaultN/A
config.providers.{name}.classThe Java class name for a provider.config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProviderN/A
config.providers.{name}.param.{param-name}A parameter to be passed to the above Java class on initialization.config.providers.file.param.secrets=/run/mysecretsN/A
config.reload.action
One of:
  • "none" - no action when scheduleConfigReload() is called
  • "restart" - schedule a restart when scheduleConfigReload() is called
config.reload.action=restartrestart


Proposed Changes

Currently the configuration for both Connectors and Tasks is stored in a Kafka topic.  The goal is for these stored configurations to only contain indirect references to secrets.  When a Connector or Task is started, the configuration will be read from Kafka and then passed to the specific Connector or Task.  Before the configuration is passed to the Connector or Task, the indirect references need to be resolved. 

The following are required in a design:

  • Ability to specify one or more custom ConfigProviders that will resolve indirect references for configuration values.  The ConfigProviders are plugins and use the same classloading mechanism as other plugins (converters, transformers, etc.).
  • Ability to pass data to initialize a ConfigProvider on construction or instantiation.
  • For indirect references, a special syntax using the dollar sign ($) will be used to indicate when a configuration value is an indirect reference and for which ConfigProvider(s).

Example:

Code Block
languagetext
# Properties specified in the Worker config
config.providers=vault   # can have multiple comma-separated values
config.provider.vault.class=com.org.apache.connect.configs.VaultConfigProvider
config.provider.vault.param.uri=1.2.3.4
config.provider.vault.param.token=/run/secrets/vault-token
 
# Properties specified in the Connector config
mysql.db.password=${vault:vault_db_password_key}

In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance.    When resolving the value for "mysql.db.password", the VaultConfigProvider will use the key "vault_db_password_key".  The VaultConfigProvider would use this key to look up the corresponding secret.  (VaultConfigProvider is a hypothetical example for illustration purposes only.)

Here is an example of a FileConfigProvider:

Code Block
//**
 * An implementation of {@link ConfigProvider} that simply uses a Properties file.
 */
public class FileConfigProvider implements ConfigProvider {

    private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class);

    public final static String FILE_NAME = "filename";

    private Properties properties;

    /**
     * Configure this class with the initialization parameters
     */
    public void configure(Map<String, ?> configs) {
        String fileName = (String) configs.get(FILE_NAME);
        try (FileReader fileReader = new FileReader(fileName)) {
            properties = new Properties();
            properties.load(fileReader);
        } catch (IOException e) {
            throw new ConfigException("File name " + fileName + " not found for FileConfigProvider");
        }
    }

    /**
     * Lookup up the data at the given path.
     */
    public ConfigData lookup(ConfigContext ctx, String path) {
        Map<String, String> data = new HashMap<>();
        Enumeration<Object> keys = properties.keys();
        while (keys.hasMoreElements()) {
    
Code Block
/**
 * An implementation of {@link ConfigProvider} that simply uses a Properties file.
 */
public class FileConfigProvider implements ConfigProvider {

    private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class);

    public final static String FILE_NAME = "filename";

    private Properties properties;

    /**
     * Configure this class with the initialization parameters
     */
    public void configure(Map<String, ?> configs) {
        String fileNamekey = keys.nextElement(String) configs.gettoString(FILE_NAME);
           try (FileReaderString fileReadervalue = new FileReader(fileName)) {
properties.getProperty(key);
            if (value != null)  {
 properties = new Properties();
            propertiesdata.load(fileReaderput(key, value);
        } catch (IOException e) {}
            throw new ConfigException("File name " + fileName + " not found for FileConfigProvider");
        }}
        return new ConfigData(Collections.<String, String>emptyMap(), data);
    }

    /**
     * Transform Lookup up the data with the configsgiven bykeys resolvingat allthe indirectgiven referencespath.
     */
    public Map<String, String> transformConfigData lookup(ConfigContext ctx, String Map<Stringpath, String>Set<String> configskeys) {
        Map<String, String> newConfigsdata = new HashMap<>();
        for (Map.Entry<String, String> configString key : configs.entrySet()keys) {
            String value = properties.getProperty(config.getValue()key);
            if (value != null) {
                log.info("Replacing {} for key {}", config.getValue(), config.getKey());
                newConfigs.put(config.getKey()data.put(key, value);
            }
        }
        return newConfigs new ConfigData(Collections.<String, String>emptyMap(), data);
    }

    public void close() {
    }
}




Usage:

Code Block
config.providers=file
config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProvider
config.providers.file.param.filename=/mydir/my.properties

...

The current scope of this proposal is for Connectors only.  It does not address brokers nor clients.   The injection will happen at a very specific time in the lifecycle of Kafka Connect, i.e. after the configuration is stored and before the Connectors and Tasks are started.  However, as mentioned above, it provides classes that can be used by Brokers and Clients that are related to existing KIPs.

A related feature for brokers is KIP-226, which allows for dynamic broker configuration.  It can also store passwords.  However,

  1. It currently does not work for Kafka Connect.
  2. One requirement is to not "leak" secrets to other systems, especially if the customer is already using a centralized Secret Management system.Management system.

A related feature for clients is KIP-76, which is for obtaining passwords through scripts.  However,

  1. It is not yet implemented.
  2. It only applies to certain password fields.
  3. It does not allow for custom plugins.

Another A related feature for clients is KIP-76269, which is for obtaining passwords through scriptsfor using variables for JAAS configuration.  However,

  1. It is not yet implemented.
  2. It only applies to certain password fields.It does not allow for custom pluginsJAAS configuration.

Again, as mentioned above, the classes above can be used to implement or augment the behavior of these KIPs.  However, that effort is a separate effort from this KIP.