Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka Connect allows integration with many types of external systems.  Some of these systems may require secrets to be configured in order to access them.  Many customers have an existing Secret Management strategy and are using centralized management systems such as Vault, Keywhiz, or AWS Secrets Manager.   Vault is very popular and has been described as "the current gold standard in secret management and provisioning".  These Secret Management systems may satisfy the following customer requirements:

...

Code Block
languagejava
public interface ConfigProvider extends Configurable, Closeable {
     
    // Configure this class with the initialization parameters
    void configure(Map<String, ?> configs);
 
    // Lookup up the data at the given path.
    ConfigData lookupget(ConfigContext ctx, String path);

    // Lookup up the data with the given keys at the given path.
    ConfigData lookupget(ConfigContext ctx, String path, Set<String> keys);
}
 
public interface ConfigContext {
  
    // The nameConfigProvider is ofresponsible thefor client
making this callback whenever String name();
 the key changes.
    // ScheduleSome aConfigProviders reload,may possiblywant for secrets rotationto have a background thread with a configurable update interval.
    void scheduleConfigReload(long delayMs);
}
 
public class ConfigData {subscribe(String key, ConfigurationChangeCallback callback);

    private// Map<String,Inverse String>of metadata;subscribe
    private Map<String, String> datavoid unsubscribe(String key);
 
    public ConfigData(Map<String, String> metadata, Map<String, String> data) {// Close all subscriptions and clean up all resources
    void close();
}

public interface  this.metadata = metadata;ConfigurationChangeCallback {
    void onChange(String key,  this.data = data;
    }

    public Map<String, String> metadata() {
 String value);
}
public interface ConfigContext {
 
    // The name of returnthe metadata;client
    }
String name();
 
    public Map<String, String> data() {
// Schedule a reload, possibly for secrets rotation
    void scheduleConfigReload(long delayMs);
}
 
public class ConfigData {

    private Map<String, String>  return datametadata;
    }
}

 

Also a helper class will be added that can provide variable substitutions using ConfigProvider instances.  Here is an example implementation (it only provides one level of indirection).

Code Block
/**
 * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
 * transformations.
 */
public class ConfigTransformer {private Map<String, String> data;

    public ConfigData(Map<String, String> metadata, Map<String, String> data) {
        this.metadata = metadata;
    private static final Logger logthis.data = LoggerFactory.getLogger(ConfigTransformer.class);
data;
    private}

  static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):(.*?)\\}");

public Map<String, String> metadata() {
     private final Map<String, ConfigProvider>return configProvidersmetadata;
    private final Pattern pattern;}

    public ConfigTransformer(Map<String, ConfigProvider>String> configProvidersdata() {
        this(configProviders, DEFAULT_PATTERN)return data;
    }
}

 

Also a helper class will be added that can provide variable substitutions using ConfigProvider instances.  Here is an example implementation.

 

Code Block
/**
 * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
 * transformations.
 */
public class ConfigTransformer {
    private static final Logger log = LoggerFactory.getLogger(ConfigTransformer.class);
    public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) {
        this.configProviders = configProviders;
        this.pattern = pattern;
    }

    publicprivate Map<String,static String>final transform(ConfigContext ctx, Map<String, String> configs) {Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):(.*?)\\}");

    private final Map<String, ConfigProvider> configProviders;
  Map<String, Set<String>> keysByProviderprivate =final newPattern HashMap<>()pattern;

    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
  Map<String, Map<String, String>> lookupsByProvider = new HashMap<>(this(configProviders, DEFAULT_PATTERN);

    }

    // Collect the variables that need transformationpublic ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) {
        for (Map.Entry<String, String> config : configs.entrySet()) {this.configProviders = configProviders;
            List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), pattern)this.pattern = pattern;
    }

    public Map<String, String> transform(ConfigContext for (ConfigVariable var : varsctx, Map<String, String> configs) {
        Map<String, Set<String>> keysByProvider = new HashMap<>();
   Set<String> keys = keysByProvider.get(var.providerName);
     Map<String, Map<String, String>> lookupsByProvider = new HashMap<>();

        // Collect the variables that need transformation
   if (keys == null) {
 for (Map.Entry<String, String> config : configs.entrySet()) {
            List<ConfigVariable> keysvars = new HashSet<>()getVars(config.getKey(), config.getValue(), pattern);
            for (ConfigVariable var : vars)  {
  keysByProvider.put(var.providerName, keys);
             Set<String> keys  }= keysByProvider.get(var.providerName);
                keys.add(var.valueVariable);if (keys == null) {
            }
        }

keys = new HashSet<>();
     // Lookup requested variables from the ConfigProviders
        for keysByProvider.put(Mapvar.Entry<String, Set<String>> entry : keysByProvider.entrySet()) {providerName, keys);
                }
            ConfigProvider  provider = configProviderskeys.getadd(entryvar.getKey()valueVariable);
            ConfigData}
   configData = provider.lookup(ctx, null, new HashSet<>(entry.getValue()));
    }

        // Lookup requested variables from the ConfigProviders
        for Map<String(Map.Entry<String, String>Set<String>> dataentry =: configDatakeysByProvider.dataentrySet();) {
            ConfigProvider provider = lookupsByProviderconfigProviders.putget(entry.getKey(), data);
        }

    ConfigData configData = provider.lookup(ctx,  // Perform the transformations
null, new HashSet<>(entry.getValue()));
            Map<String, String> resultdata = new HashMap<>(configsconfigData.data();
        for (Map.Entry<String, String> config : configs.entrySet()) { lookupsByProvider.put(entry.getKey(), data);
        }

    result.put(config.getKey(), replace(lookupsByProvider, config.getValue(), pattern));
      // Perform the transformations
      }
  Map<String, String> result =   return resultnew HashMap<>(configs);
    }

    privatefor static List<ConfigVariable> getVars(String key(Map.Entry<String, StringString> value,config Pattern pattern: configs.entrySet()) {
        List<ConfigVariable>   configVars = new ArrayList<>( result.put(config.getKey(), replace(lookupsByProvider, config.getValue(), pattern));
        Matcher}
 matcher = pattern.matcher(value);
     return result;
  while (matcher.find()) { }

    private static List<ConfigVariable> getVars(String key, String value, Pattern  configVars.add(new ConfigVariable(matcher.group(1), matcher.group(2)))pattern) {
        List<ConfigVariable> configVars = new ArrayList<>();
        }
Matcher matcher = pattern.matcher(value);
     return configVars;
  while  }
(matcher.find()) {
    private static String replace(Map<String, Map<String, String>> lookupsByProvider, String value, Pattern pattern) {
        Matcher matcher = pattern.matcher(value configVars.add(new ConfigVariable(matcher.group(1), matcher.group(2)));
        StringBuilder}
 builder = new StringBuilder();
    return configVars;
   int i = 0; }

    private static String  while (matcher.find()) {
     replace(Map<String, Map<String, String>> lookupsByProvider, String value, Pattern pattern) {
       Map<String, String>Matcher mapmatcher = lookupsByProviderpattern.get(matcher.group(1)value);
        StringBuilder builder =  String replacement = map.get(matcher.group(2))new StringBuilder();
        int i = 0;
   builder.append(value, i, matcher.start    while (matcher.find()); {
            if (replacementMap<String, String> map == null) lookupsByProvider.get(matcher.group(1));
            String replacement =  buildermap.appendget(matcher.group(02));
            else
  builder.append(value, i, matcher.start());
             if builder.append(replacement == null);
            i =    builder.append(matcher.group(0));
            else
                builder.append(replacement);
            i = matcher.end();
        }
        builder.append(value, i, value.length());
        return builder.toString();
    }

    private static class ConfigVariable {
        final String providerName;
        final String valueVariable;

        ConfigVariable(String providerName, String valueVariable) {
            this.providerName = providerName;
            this.valueVariable = valueVariable;
        }

        public String toString() {
            return "(" + providerName + ":" + valueVariable + ")";
        }
    }
}

...

  • Ability to specify one or more custom ConfigProviders that will resolve indirect references for configuration values.  The ConfigProviders are plugins and use the same classloading mechanism as other plugins (converters, transformers, etc.).
  • Ability to pass data to initialize a ConfigProvider on construction or instantiation.
  • For indirect references, a special syntax using the dollar sign ($) will be used to indicate when a configuration value is an indirect reference and for which ConfigProvider(s).

The patterns for variable substitutions are of the form ${provider:key}, where only one level of indirection is followed during substitutions.  This means if you have the following:

Code Block
foo=${file:bar}
bar=${file:baz}

and your file contains

Code Block
bar=hello
baz=world

then the result will be

Code Block
foo=hello
baz=world

 

Here is an example use caseExample:

Code Block
languagetext
# Properties specified in the Worker config
config.providers=vault   # can have multiple comma-separated values
config.provider.vault.class=com.org.apache.connect.configs.VaultConfigProvider
config.provider.vault.param.uri=1.2.3.4
config.provider.vault.param.token=/run/secrets/vault-token
 
# Properties specified in the Connector config
mysql.db.password=${vault:vault_db_password_key}

...