Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6886

Released: 2.0.0

 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • ConfigProviderConfigChangeCallback, ConfigData:  These interfaces are used to abstract a provider of configuration properties.
  • ConfigTransformer:  This class is used to provide variable substitution for a configuration value, by looking up variables (or indirect references) from a set of ConfigProvider instances.  It only provides one level of indirection.

The above classes will be in the package org.apache.kafka.common.config, except for ConfigProvider, which will be in the package org.apache.kafka.common.config.provider, along with any implementations of ConfigProvider (which is currently only FileConfigProvider).

Code Block
languagejava
public interface ConfigProvider extends Configurable, Closeable {
     
    // Configure this class with the initialization parameters
    void configure(Map<String, ?> configs);
 
    // Look up the data at the given path.
    ConfigData get(String path);

    // Look up the data with the given keys at the given path.
    ConfigData get(String path, Set<String> keys);
 
    // The ConfigProvider is responsible for making this callback whenever the key changes.
    // Some ConfigProviders may want to have a background thread with a configurable update interval.
    void subscribe(String path, Set<String> keys, ConfigChangeCallback callback);

    // Inverse of subscribe
    void unsubscribe(String path, Set<String> key, ConfigChangeCallback callback);
 
    // Remove all subscriptions
    void unsubscribeAll();
 
    // Close all subscriptions and clean up all resources
    void close();
}
 
public class ConfigData {
 
    private longLong ttl;
    private Map<String, String> data;
 
    public ConfigData(Map<String, String> data, longLong ttl) {
        this.ttl = ttl;
        this.data = data;
    }
 
    public ConfigData(Map<String, String> data) {
        this(Long.MAX_VALUEnull, data);
    }
 
    public longLong ttl() {
        return ttl;
    }
 
    public Map<String, String> data() {
        return data;
    }
}
public interface ConfigChangeCallback {
   
    void onChange(String path, Map<String, String> valuesConfigData data);
}
 

 

Also a helper class will be added that can provide variable substitutions using ConfigProvider instances.  Here is an example skeleton.

...

Code Block
/**
 * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
 * transformations.
 */
public class ConfigTransformer {
    private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");

    private final Map<String, ConfigProvider> configProviders;
    private final Pattern pattern;

    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
        this(configProviders, DEFAULT_PATTERN);
    }

    public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) {
        this.configProviders = configProviders;
        this.pattern = pattern;
    }

    public Map<String, String> transform(Map<String, String> configs, ConfigChangeCallback callback) {
        ...
    }
}

 

An implementation of ConfigProvider called FileConfigProvider will be provided that can use secrets from a Properties file.  When using the FileConfigProvider with the variable syntax ${file:path:key}, the path will be the path to the file and the key will be the property key.

 

Implementations of ConfigProvider, such as FileConfigProvider, that are provided with Apache Kafka will be placed in the package org.apache.kafka.common.config.provider.  This will facilitate frameworks such as Connect that treat instances of ConfigProvider as components that should be loaded in isolation.  Connect will assume that all classes in the package org.apache.kafka.common.config.provider (except ConfigProvider) are such components.

Two existing interfaces that are specific to Connect will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloadedTwo existing interfaces that are specific to Connect will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).

Code Block
languagejava
public interface SinkTaskContext {
    ...
    default Map<String, String> configconfigs() {;
        ...
    }
    ...
}
 
 
public interface SourceTaskContext {
    ...
    default Map<String, String> configconfigs() {
        ...
    };
    ...
}

 

The following configuration properties for Connect will be added.

...

Here is an example of a FileConfigProvider:

Code Block
//**
 * An implementation of {@link ConfigProvider} that simplyrepresents uses a Properties file.
 * All property keys and values are stored as cleartext.
 */
public class FileConfigProvider implements ConfigProvider {

    public final static String FILE_NAME = "filename";
void configure(Map<String, ?> configs) {
    private Properties properties;}

    /**
     * ConfigureRetrieves thisthe classdata withat the given initializationProperties parametersfile.
     */
    public void configure(Map<String, ?> configs) {
    * @param path the file where the data resides
    String fileName* = (String) configs.get(FILE_NAME);@return the configuration data
     */
   try (FileReaderpublic fileReaderConfigData = new FileReader(fileName))get(String path) {
        Map<String, String>   propertiesdata = new PropertiesHashMap<>();
        if (path == null || propertiespath.loadisEmpty(fileReader)); {
         } catch (IOException e) { return new ConfigData(data);
        }
    throw new ConfigException("File name "try +(Reader fileNamereader + " not found for FileConfigProvider");= reader(path)) {
        }
    }

Properties properties =  /**new Properties();
     * Look up all the data (the path is ignored in this example).
     */
    public Map<String, String> get(String path) {
        Map<String, String> data = new HashMap<>();
   properties.load(reader);
            Enumeration<Object> keys = properties.keys();
            while (keys.hasMoreElements()) {
                String key = keys.nextElement().toString();
                String value = properties.getProperty(key);
                if (value != null) {
                    data.put(key, value);
                }
            }
            return new ConfigData(data);
        }

 catch (IOException e) /**{
     * Look up the data with the giventhrow keysnew ConfigException(the"Could pathnot isread ignored in this example).properties from file " + path);
     */
   }
 public Map<String, String> get(String path, Set<String> keys) { }

    /**
     * Retrieves the Map<String,data String>with datathe =given new HashMap<>();
    keys at the given Properties file.
    for (String*
 key : keys) {
 * @param path the file where the data resides
    String value* = properties.getProperty(key);
       @param keys the keys whose values will be retrieved
     if* (value@return !=the null)configuration {data
     */
    public ConfigData get(String path, Set<String> keys) {
   data.put(key, value     Map<String, String> data = new HashMap<>();
        if (path == null }
  || path.isEmpty()) {
      }
      return new return ConfigData(data);
    }
    }
    public void subscribe(String path, Set<String>try keys,(Reader ConfigChangeCallback callback)reader = reader(path)) {
        throw    Properties properties = new UnsupportedOperationExceptionProperties();
     }
 
    public void unsubscribe(String path, Set<String> key) {
properties.load(reader);
            for (String throwkey new: UnsupportedOperationException(keys); {
    }
 
     public void close() {
    }
}

Usage:

Code Block
config.providers=file
config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProvider
config.providers.file.param.filename=/mydir/my.properties

 

Secret Rotation

Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.   

In general, secret rotation is orthogonal to a particular Connector.  For example, a JDBC password may be stored in a Docker secret or in Vault.  The JDBC connector does not need to know what the method of rotation is.  Also, it is best if the JDBC connector is informed when it should re-obtain secrets rather than wait until a security exception occurs.  So in this case, a push model is warranted.

Other connectors such as the S3 connector are tightly coupled with a particular secret manager, and may wish to handle rotation on their own.  

To handle the different scenarios, the design offers support both a push model and a pull model for obtaining new secrets.  

Different Connect components may have different responsibilities in handling secret rotation:

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call onChange() to inform the Herder.
  • Herder:  The Herder can push information to the Connector indicating that secrets have expired or may expire in the future.  When the Herder receives the onChange() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none.  The methods SinkTaskContext.config() and SourceTaskContext.config()would be used by the Task to reload the config and resolve indirect references again.

Compatibility, Deprecation, and Migration Plan

No changes are required for existing Connectors.  Existing connector configurations with plaintext passwords will not be affected, and only after they are changed to use the variables (aka, indirect references) will the secrets not be stored by Connect. 

String value = properties.getProperty(key);
                if (value != null) {
                    data.put(key, value);
                }
            }
            return new ConfigData(data);
        } catch (IOException e) {
            throw new ConfigException("Could not read properties from file " + path);
        }
    }

    // visible for testing
    protected Reader reader(String path) throws IOException {
        return new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8));
    }

    public void close() {
    }
}


Usage:

Code Block
config.providers=file
config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProvider

 

Secret Rotation

Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.   

In general, secret rotation is orthogonal to a particular Connector.  For example, a JDBC password may be stored in a Docker secret or in Vault.  The JDBC connector does not need to know what the method of rotation is.  Also, it is best if the JDBC connector is informed when it should re-obtain secrets rather than wait until a security exception occurs.  So in this case, a push model is warranted.

Other connectors such as the S3 connector are tightly coupled with a particular secret manager, and may wish to handle rotation on their own.  

To handle the different scenarios, the design offers support both a push model and a pull model for obtaining new secrets.  

Different Connect components may have different responsibilities in handling secret rotation:

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call onChange() to inform the Herder.
  • Herder:  The Herder can push information to the Connector indicating that secrets have expired or may expire in the future.  When the Herder receives the onChange() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none.  The methods SinkTaskContext.config() and SourceTaskContext.config()would be used by the Task to reload the config and resolve indirect references again.


Compatibility, Deprecation, and Migration Plan

No changes are required for existing Connectors.  Existing connector configurations with plaintext passwords will not be affected, and only after they are changed to use the variables (aka, indirect references) will the secrets not be stored by Connect. 

Connectors that use a ConfigProvider and do not want the restart behavior can specify config.reload.action as none.

Assumptions and Limitations

A typical pattern in Connector implementations is for the Task configuration to be a superset of the Connector configuration.   That is, when creating a Task configuration, the Connector will copy the Connector configuration and then perhaps add some additional configuration properties.  However, there is nothing in the Connect framework that enforces that this relationship need hold between the Task configuration and the Connector configuration.

For the implementation of this KIP, we will make the assumption that a variable reference in the property of the Task configuration has the same key as the variable reference in the Connector configuration.  This is because when the Connector configuration is passed to the Connector, all variable references have been resolved, so that when the Connector returns the Task configuration to the Connect framework, the Connect framework can make use of this assumption to  "reverse transform" the resolved value back to the original variable reference before saving the Task configuration.

This implies that when the assumption does not hold, which would be the case if the Connector copies the resolved value to a property with a different key, for example, then the Connect framework will not be able to reverse transform the value to the original variable reference, and the value will appear in plain-text when saved.   This is a limitation that is currently not addressed by this KIPConnectors that use a ConfigProvider and do not want the restart behavior can specify config.reload.action as none.

Rejected Alternatives

The current scope of this proposal is for Connectors only.  However, as mentioned above, it provides classes that can be used by Brokers and Clients for functionality in other KIPs.

...