Connect should not store or transmit cleartext passwords in connector configurations. TLS can be enabled on Connect's REST API, and this proposal addresses how Connect deals with secrets in stored connector configurations by integrating with external secret management systems. First, since no single standard exists, Connect will provide an extension point for adding customized integrations and will provide a simple file-based extension as an example. Second, a Connect runtime can be configured to use one or more of these extensions, and will allow connector configurations to use placeholders that will be resolved by the runtime before passing the complete connector configurations to connectors. Therefore, existing connectors will not see any difference in the configurations that Connect provides to them at startup. And third, Connect's API will be changed to allow a connector to obtain the latest connector configuration at any time.

Public Interfaces

Two new interfaces will be added to Kafka Connect's public API to allow custom implementations to integrate with external systems for managing secrets.

Relationship to Kafka Brokers and Clients

While this KIP focuses on Kafka Connect, we propose some common public interfaces and classes that could be used by other parts of Kafka, specifically:

  • ConfigProviderConfigData, and ConfigContext:  These interfaces could potentially be used by the Broker in conjunction with KIP-226 to overlay configuration properties from a ConfigProvider (such as a VaultConfigProvider) onto existing configuration properties.
  • ConfigTransformer:  This class could potentially be used by other clients (in conjunction with the previous interfaces) to implement variants of KIP-76 and KIP-269.

Code Block
public interface ConfigProvider extends Configurable, Closeable {
    // Configure this class with the initialization parameters
    void configure(Map<String, ?> configs);
    // Transform the configs by resolving all indirect references
    Map<String, String> transform(ConfigContext ctx, Map<String, String> configs);
public interface ConfigContext {
    public interface ConfigContext {
    void configure(Map<String, ?> configs);
    ConfigData lookup(ConfigContext ctx, String path);
    ConfigData lookup(ConfigContext ctx, String path, Set<String> keys);


Two existing interfaces will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).

    default Map<String, String> config() {
    ...
}

    // Lookup up the data with the given keys at the given path.
    public interface SourceTaskContext {
 String path, Set<String> keys);
    String name();
    // Schedule a reload, possibly for secrets rotation
    void scheduleConfigReload(long delayMs);
public class ConfigData {

    private Map<String, String> metadata;
    private Map<String, String> data;

    public ConfigData(Map<String, String> metadata, Map<String, String> data) {
        this.data = data;
    }

    public Map<String, String> metadata() {
        return metadata;
    }

    public Map<String, String> data() {
        return data;


Also a helper class will be added that can provide variable substitutions using ConfigProvider instances.  Here is an example implementation:

Code Block
 * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
 * transformations.
public class ConfigTransformer {
    private static final Logger log = LoggerFactory.getLogger(ConfigTransformer.class);

    private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):(.*?)\\}");

    private final Map<String, ConfigProvider> configProviders;
    private final Pattern pattern;

    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
        this(configProviders, DEFAULT_PATTERN);
    }

    public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) {
        this.configProviders = configProviders;
        this.pattern = pattern;
    }

    public Map<String, String> transform(ConfigContext ctx, Map<String, String> configs) {
        Map<String, Set<String>> keysByProvider = new HashMap<>();
        Map<String, Map<String, String>> lookupsByProvider = new HashMap<>();

        // Collect the variables that need transformation
        for (Map.Entry<String, String> config : configs.entrySet()) {
            List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), pattern);
            for (ConfigVariable var : vars) {
                Set<String> keys = keysByProvider.get(var.providerName);
                if (keys == null) {
                    keys = new HashSet<>();
                    keysByProvider.put(var.providerName, keys);

        // Lookup requested variables from the ConfigProviders
        for (Map.Entry<String, Set<String>> entry : keysByProvider.entrySet()) {
            ConfigProvider provider = configProviders.get(entry.getKey());
            ConfigData configData = provider.lookup(ctx, null, new HashSet<>(entry.getValue()));
            Map<String, String> data =;
            lookupsByProvider.put(entry.getKey(), data);
        }

        // Perform the transformations
        Map<String, String> result = new HashMap<>(configs);
        for (Map.Entry<String, String> config : configs.entrySet()) {
            result.put(config.getKey(), replace(lookupsByProvider, config.getValue(), pattern));
        return result;
    }

    private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) {
        List<ConfigVariable> configVars = new ArrayList<>();
        Matcher matcher = pattern.matcher(value);
        while (matcher.find()) {
            configVars.add(new ConfigVariable(,;
        return configVars;
    }

    private static String replace(Map<String, Map<String, String>> lookupsByProvider, String value, Pattern pattern) {
        Matcher matcher = pattern.matcher(value);
        StringBuilder builder = new StringBuilder();
        int i = 0;
        while (matcher.find()) {
            Map<String, String> map = lookupsByProvider.get(;
            String replacement = map.get(;
            builder.append(value, i, matcher.start());
            if (replacement == null)
            i = matcher.end();
        builder.append(value, i, value.length());
        return builder.toString();
    }

    private static class ConfigVariable {
        final String providerName;
        final String valueVariable;

        ConfigVariable(String providerName, String valueVariable) {
            this.providerName = providerName;
            this.valueVariable = valueVariable;
        }

        public String toString() {
            return "(" + providerName + ":" + valueVariable + ")";
        }
    }
}


Code Block
 * An implementation of {@link ConfigProvider} that simply uses a Properties file.
public class FileConfigProvider implements ConfigProvider {

    private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class);

    public final static String FILE_NAME = "filename";

    private Properties properties;

     * Configure this class with the initialization parameters
     */
    public void configure(Map<String, ?> configs) {
        String fileName = (String) configs.get(FILE_NAME);
        try (FileReader fileReader = new FileReader(fileName)) {
            properties = new Properties();
        } catch (IOException e) {
            throw new ConfigException("File name " + fileName + " not found for FileConfigProvider");
        }
    }

     * Lookup up the data at the given path.
     */
    public ConfigData lookup(ConfigContext ctx, String path) {
        Map<String, String> data = new HashMap<>();
        Enumeration<Object> keys = properties.keys();
        while (keys.hasMoreElements()) {
    public void configure(Map<String, ?> configs) {
        String fileNamekey = keys.nextElement(String) configs.gettoString(FILE_NAME);
           String value = properties.getProperty(key);
            if (value != null)  {
 properties = new Properties();
            propertiesdata.load(fileReaderput(key, value);
        }
        }
            throw new ConfigException("File name " + fileName + " not found for FileConfigProvider");
        return new ConfigData(Collections.<String, String>emptyMap(), data);
    }

     * Lookup up the data with the given keys at the given path.
     */
    public Map<String, String> transformConfigData lookup(ConfigContext ctx, String Map<Stringpath, String>Set<String> configskeys) {
        Map<String, String> newConfigsdata = new HashMap<>();
        for (Map.Entry<String, String> configString key : configs.entrySet()keys) {
            String value = properties.getProperty(config.getValue()key);
            if (value != null) {
      "Replacing {} for key {}", config.getValue(), config.getKey());
                newConfigs.put(config.getKey()data.put(key, value);
        return new ConfigData(Collections.<String, String>emptyMap(), data);
    }

    public void close() {


The current scope of this proposal is for Connectors only.  It does not address brokers nor clients.   The injection will happen at a very specific time in the lifecycle of Kafka Connect, i.e. after the configuration is stored and before the Connectors and Tasks are started.  However, as mentioned above, it provides classes that can be used by Brokers and Clients that are related to existing KIPs.

A related feature for brokers is KIP-226, which allows for dynamic broker configuration.  It can also store passwords.  However,

  1. It currently does not work for Kafka Connect.
  2. One requirement is to not "leak" secrets to other systems, especially if the customer is already using a centralized Secret Management system.

A related feature for clients is KIP-76, which is for obtaining passwords through scripts.  However,

  1. It is not yet implemented.
  2. It only applies to certain password fields.
  3. It does not allow for custom plugins.

Another related feature for clients is KIP-269, which is for using variables for JAAS configuration.  However,

  1. It is not yet implemented.
  1. It only applies to JAAS configuration.

Again, as mentioned above, the classes above can be used to implement or augment the behavior of these KIPs.  However, that effort is a separate effort from this KIP.