Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Implementations of ConfigProvider, such as FileConfigProvider, that are provided with Apache Kafka will be placed in the package org.apache.kafka.common.config.provider.  This will facilitate frameworks such as Connect that treat instances of ConfigProvider as components that should be loaded in isolation.  Connect will assume that all classes in the package org.apache.kafka.common.config.provider (except ConfigProvider) are such components.

Two existing interfaces that are specific to Connect will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded.

...

Here is an example of a FileConfigProvider:

Code Block
//**
 * An implementation of {@link ConfigProvider} that simply usesrepresents a Properties file.
 * All property keys and values are stored as cleartext.
 */
public class FileConfigProvider implements ConfigProvider {

    public final static String FILE_NAME = "filename";

void configure(Map<String, ?> configs) {
    private Properties properties;}

    /**
     * ConfigureRetrieves thisthe classdata withat the initialization parametersgiven Properties file.
     */
     public* void configure(Map<String, ?> configs) {
   @param path the file where the data resides
     String* fileName@return =the (String) configs.get(FILE_NAME);configuration data
     */
   try (FileReaderpublic fileReaderConfigData = new FileReader(fileName)) {get(String path) {
        Map<String, String> data  properties = new PropertiesHashMap<>();
        if (path == null properties.load(fileReader);|| path.isEmpty()) {
        } catch (IOException e) {    return new ConfigData(data);
        }
    throw new ConfigException("File name "try +(Reader fileNamereader + " not found for FileConfigProvider");
= reader(path)) {
            }
Properties properties = new }

Properties();
     /**
     * Look up all the data (the path is ignored in this example).
     */ properties.load(reader);
            Enumeration<Object> keys = properties.keys();
    public Map<String, String> get(String path) {
        Map<String, String> data = new HashMap<>();
        Enumeration<Object> keys = properties.keys();
        while (keys.hasMoreElements()) {
                String key = keys.nextElement().toString();
                String value = properties.getProperty(key);
                if (value != null) {
                    data.put(key, value);
                }
            }
            return new ConfigData(data);
        }

 catch (IOException e) /**{
     * Look up the data with the giventhrow keysnew ConfigException(the"Could pathnot isread ignoredproperties infrom this example).
  file " + path);
   */
    public Map<String,}
 String> get(String path, Set<String>}

 keys) {
  /**
     * Map<String,Retrieves String>the data =with new HashMap<>();
        for (String key : keys) {the given keys at the given Properties file.
     *
     * @param path the file where the Stringdata valueresides
 = properties.getProperty(key);
   * @param keys the keys whose values will be ifretrieved
 (value != null) {
 * @return the configuration data
     */
    public ConfigData data.put(keyget(String path, value);Set<String> keys) {
        Map<String, String> data  }
= new HashMap<>();
        if (path  }
== null || path.isEmpty()) {
            return new ConfigData(data);
    }
    }
    public  void subscribe(String path,try Set<String>(Reader keys,reader ConfigChangeCallback callback= reader(path)) {
        throw new UnsupportedOperationException();
  Properties  }properties = new Properties();
 
      public void unsubscribe(String path, Set<String> key) {
properties.load(reader);
           throw newfor UnsupportedOperationException();
String key : keys) {
       }
 
    public void close() {
  String  }
}

Usage:

Code Block
config.providers=file
config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProvider
config.providers.file.param.filename=/mydir/my.properties

 

Secret Rotation

Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.   

In general, secret rotation is orthogonal to a particular Connector.  For example, a JDBC password may be stored in a Docker secret or in Vault.  The JDBC connector does not need to know what the method of rotation is.  Also, it is best if the JDBC connector is informed when it should re-obtain secrets rather than wait until a security exception occurs.  So in this case, a push model is warranted.

Other connectors such as the S3 connector are tightly coupled with a particular secret manager, and may wish to handle rotation on their own.  

To handle the different scenarios, the design offers support both a push model and a pull model for obtaining new secrets.  

Different Connect components may have different responsibilities in handling secret rotation:

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call onChange() to inform the Herder.
  • Herder:  The Herder can push information to the Connector indicating that secrets have expired or may expire in the future.  When the Herder receives the onChange() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none.  The methods SinkTaskContext.config() and SourceTaskContext.config()would be used by the Task to reload the config and resolve indirect references again.

Compatibility, Deprecation, and Migration Plan

No changes are required for existing Connectors.  Existing connector configurations with plaintext passwords will not be affected, and only after they are changed to use the variables (aka, indirect references) will the secrets not be stored by Connect. 

value = properties.getProperty(key);
                if (value != null) {
                    data.put(key, value);
                }
            }
            return new ConfigData(data);
        } catch (IOException e) {
            throw new ConfigException("Could not read properties from file " + path);
        }
    }

    // visible for testing
    protected Reader reader(String path) throws IOException {
        return new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8));
    }

    public void close() {
    }
}


Usage:

Code Block
config.providers=file
config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProvider

 

Secret Rotation

Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.   

In general, secret rotation is orthogonal to a particular Connector.  For example, a JDBC password may be stored in a Docker secret or in Vault.  The JDBC connector does not need to know what the method of rotation is.  Also, it is best if the JDBC connector is informed when it should re-obtain secrets rather than wait until a security exception occurs.  So in this case, a push model is warranted.

Other connectors such as the S3 connector are tightly coupled with a particular secret manager, and may wish to handle rotation on their own.  

To handle the different scenarios, the design offers support both a push model and a pull model for obtaining new secrets.  

Different Connect components may have different responsibilities in handling secret rotation:

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call onChange() to inform the Herder.
  • Herder:  The Herder can push information to the Connector indicating that secrets have expired or may expire in the future.  When the Herder receives the onChange() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none.  The methods SinkTaskContext.config() and SourceTaskContext.config()would be used by the Task to reload the config and resolve indirect references again.


Compatibility, Deprecation, and Migration Plan

No changes are required for existing Connectors.  Existing connector configurations with plaintext passwords will not be affected, and only after they are changed to use the variables (aka, indirect references) will the secrets not be stored by Connect. 

Connectors that use a ConfigProvider and do not want the restart behavior can specify config.reload.action as none.

Assumptions and Limitations

A typical pattern in Connector implementations is for the Task configuration to be a superset of the Connector configuration.   That is, when creating a Task configuration, the Connector will copy the Connector configuration and then perhaps add some additional configuration properties.  However, there is nothing in the Connect framework that enforces that this relationship need hold between the Task configuration and the Connector configuration.

For the implementation of this KIP, we will make the assumption that a variable reference in the property of the Task configuration has the same key as the variable reference in the Connector configuration.  This is because when the Connector configuration is passed to the Connector, all variable references have been resolved, so that when the Connector returns the Task configuration to the Connect framework, the Connect framework can make use of this assumption to  "reverse transform" the resolved value back to the original variable reference before saving the Task configuration.

This implies that when the assumption does not hold, which would be the case if the Connector copies the resolved value to a property with a different key, for example, then the Connect framework will not be able to reverse transform the value to the original variable reference, and the value will appear in plain-text when saved.   This is a limitation that is currently not addressed by this KIPConnectors that use a ConfigProvider and do not want the restart behavior can specify config.reload.action as none.

Rejected Alternatives

The current scope of this proposal is for Connectors only.  However, as mentioned above, it provides classes that can be used by Brokers and Clients for functionality in other KIPs.

...