Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

While this KIP focuses on Kafka Connect, we propose some common public interfaces and classes that could be used by other parts of Kafka, specifically:

  • ConfigProviderConfigData, and ConfigContextConfigChangeCallback:  These interfaces could potentially be used by the Broker in conjunction with KIP-226 to overlay configuration properties from a ConfigProvider (such as a VaultConfigProvider) onto existing configuration properties.
  • ConfigTransformer:  This class could potentially be used by other clients (in conjunction with the previous interfaces) to implement variants of KIP-76 and KIP-269.

...

The public interfaces that are not Connect-specific consist of the following:

  • ConfigProviderConfigData, and ConfigContextConfigChangeCallback:  These interfaces are used to abstract a provider of configuration properties.
  • ConfigTransformer:  This class is used to provide variable substitution for a configuration value, by looking up variables (or indirect references) from a set of ConfigProvider instances.  It only provides one level of indirection.

Code Block
languagejava
public interface ConfigProvider extends Configurable, Closeable {
     
    // Configure this class with the initialization parameters
    void configure(Map<String, ?> configs);
 
    // Lookup up the data at the given path.
    ConfigDataMap<String, String> get(ConfigContext ctx, String path);

    // Lookup up the data with the given keys at the given path.
    ConfigDataMap<String, String> get(ConfigContext ctx, String path, Set<String> keys);
 
    // The ConfigProvider is responsible for making this callback whenever the key changes.
    // Some ConfigProviders may want to have a background thread with a configurable update interval.
    void subscribe(String path, Set<String> keykeys, ConfigurationChangeCallbackConfigChangeCallback callback);

    // Inverse of subscribe
    void unsubscribe(String path, Set<String> key);
 
    // Close all subscriptions and clean up all resources
    void close();
}

public interface ConfigurationChangeCallbackConfigChangeCallback {
    void onChange(String keypath, Map<String, String> values, Stringint valuedelayMs);
}
 
public interface ConfigContext {
 
    // The name of the client
    String name();
 
    // Schedule a reload, possibly for secrets rotation
    void scheduleConfigReload(long delayMs);
}
 
public class ConfigData {

    private Map<String, String> metadata;
    private Map<String, String> data;

    public ConfigData(Map<String, String> metadata, Map<String, String> data) {
        this.metadata = metadata;
        this.data = data;
    }

    public Map<String, String> metadata() {
        return metadata;
    }

    public Map<String, String> data() {
        return data;
    }
}

 

Also a helper class will be added that can provide variable substitutions using ConfigProvider instances.  Here is an example implementation.

 

 

 

Also a helper class will be added that can provide variable substitutions using ConfigProvider instances.  Here is an example skeleton.

 

Code Block
/**
 * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
 * transformations.
 */
public class ConfigTransformer {
    private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):(.*?)\\}");

    private final Map<String, ConfigProvider> configProviders;
    private final Pattern pattern;

    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
Code Block
/**
 * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
 * transformations.
 */
public class ConfigTransformer {
    private static final Logger log = LoggerFactory.getLogger(ConfigTransformer.class);

    private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):(.*?)\\}");

    private final Map<String, ConfigProvider> configProviders;
    private final Pattern pattern;

    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
        this(configProviders, DEFAULT_PATTERN);
    }

    public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) {
        this.configProviders = configProviders;
        this.pattern = pattern;
    }

    public Map<String, String> transform(ConfigContext ctx, Map<String, String> configs) {
        Map<String, Set<String>> keysByProvider = new HashMap<>();
        Map<String, Map<String, String>> lookupsByProvider = new HashMap<>();

        // Collect the variables that need transformation
        for (Map.Entry<String, String> config : configs.entrySet()) {
            List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), pattern);
            for (ConfigVariable var : vars) {
                Set<String> keys = keysByProvider.get(var.providerName);
                if (keys == null) {
                    keys = new HashSet<>();
                    keysByProvider.put(var.providerName, keys);
                }
                keys.add(var.valueVariable);
            }
        }

        // Lookup requested variables from the ConfigProviders
        for (Map.Entry<String, Set<String>> entry : keysByProvider.entrySet()) {
            ConfigProvider provider = configProviders.get(entry.getKey());
            ConfigData configData = provider.lookup(ctx, null, new HashSet<>(entry.getValue()));
            Map<String, String> data = configData.data();
            lookupsByProvider.put(entry.getKey(), data);
        }

        // Perform the transformations
        Map<String, String> result = new HashMap<>(configs);
        for (Map.Entry<String, String> config : configs.entrySet()) {
            result.put(config.getKey(), replace(lookupsByProvider, config.getValue(), pattern));
        }
        return result;
    }

    private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) {
        List<ConfigVariable> configVars = new ArrayList<>();
        Matcher matcher = pattern.matcher(value);
        while (matcher.find()) {
            configVars.add(new ConfigVariable(matcher.group(1), matcher.group(2)));
        }
        return configVars;
    }

    private static String replace(Map<String, Map<String, String>> lookupsByProvider, String value, Pattern pattern) {
        Matcher matcher = pattern.matcher(value);
        StringBuilder builder = new StringBuilder();
        int i = 0;
        while (matcher.find()) {
            Map<String, String> map = lookupsByProvider.get(matcher.group(1));
            String replacement = map.get(matcher.group(2));
            builder.append(value, i, matcher.start());
            if (replacement == null)
                builder.append(matcher.group(0));
            else
                builder.append(replacement);
            i = matcher.end();
        }
        builder.append(value, i, value.length());
        return builder.toString();
    }

    private static class ConfigVariable {
        final String providerName;
        final String valueVariablethis(configProviders, DEFAULT_PATTERN);

    }

    ConfigVariable(String providerName, String valueVariable) {
    public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) {
        this.providerNameconfigProviders = providerNameconfigProviders;
            this.valueVariablepattern = valueVariablepattern;
        }

        public String toString() {
            return "(" + providerName + ":" + valueVariable + ")";Map<String, String> transform(Map<String, String> configs) {
        }...
    }
}

 

Two existing interfaces that are specific to Connect will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).

...

The patterns for variable substitutions are of the form ${provider:[path:]key}, where only one level of indirection is followed during substitutions.  This   The path in the variable is optional.  This means if you have the following:

...

Code Block
languagetext
# Properties specified in the Worker config
config.providers=vault   # can have multiple comma-separated values
config.provider.vault.class=com.org.apache.connect.configs.VaultConfigProvider
config.provider.vault.param.uri=1.2.3.4
config.provider.vault.param.token=/run/secrets/vault-token
 
# Properties specified in the Connector config
mysql.db.password=${vault:vault_path:vault_db_password_key}

In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance.    When resolving the value for "mysql.db.password", the VaultConfigProvider will use the key "vault_db_password_key".  The VaultConfigProvider would use this key to look up the corresponding secret.  (VaultConfigProvider is a hypothetical example for illustration purposes only.)

...

Code Block
//**
 * An implementation of {@link ConfigProvider} that simply uses a Properties file.
 */
public class FileConfigProvider implements ConfigProvider {

    private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class);

    public final static String FILE_NAME = "filename";

    private Properties properties;

    /**
     * Configure this class with the initialization parameters
     */
    public void configure(Map<String, ?> configs) {
        String fileName = (String) configs.get(FILE_NAME);
        try (FileReader fileReader = new FileReader(fileName)) {
            properties = new Properties();
            properties.load(fileReader);
        } catch (IOException e) {
            throw new ConfigException("File name " + fileName + " not found for FileConfigProvider");
        }
    }

    /**
     * Lookup up the data at the given path.
     */
    public ConfigDataMap<String, String> get(ConfigContext ctx, String path) {
        Map<String, String> data = new HashMap<>();
        Enumeration<Object> keys = properties.keys();
        while (keys.hasMoreElements()) {
            String key = keys.nextElement().toString();
            String value = properties.getProperty(key);
            if (value != null) {
                data.put(key, value);
            }
        }
        return new ConfigData(Collections.<String, String>emptyMap(),return new data);
    }

    /**
     * Lookup up the data with the given keys at the given path.
     */
    public ConfigDataMap<String, String> get(ConfigContext ctx, String path, Set<String> keys) {
        Map<String, String> data = new HashMap<>();
        for (String key : keys) {
            String value = properties.getProperty(key);
            if (value != null) {
                data.put(key, value);
            }
        }
        return new ConfigData(Collections.<String, String>emptyMap(), data.put(key, value);
    }
    // The ConfigProvider is responsible}
 for making this callback whenever the key changes.}
    // Some ConfigProviders may wantreturn todata;
 have a background thread}
 with a configurable update interval.
    public void subscribe(String key, ConfigurationChangeCallbackConfigChangeCallback callback) {
        throw new UnsupportedOperationException();
    }
 
    // Inverse of subscribe
    public void unsubscribe(String key) {
        throw new UnsupportedOperationException();
    }
 
    public void close() {
    }
}

...

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call scheduleConfigReloadonChange() to inform the Herder.
  • Herder:  The Herder can push information to the Connector indicating that secrets have expired or may expire in the future.  When the Herder receives the scheduleConfigReloadonChange() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none.  The methods SinkTaskContext.config() and SourceTaskContext.config()would be used by the Task to reload the config and resolve indirect references again.

...