Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]TBD

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6886

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Other customers may be passing secrets into the host through various means (such as through Docker secrets), but do not want the secret to appear in cleartext in the Kafka Connect configuration. There is a need

Connect's connector configurations have plaintext passwords, and Connect stores these in cleartext either on the filesystem (for standalone mode) or in internal topics (for distributed mode). Connect's REST API also exposes these passwords when unsecured connections are used.

Connect should not store or transmit cleartext passwords in connector configurations. TLS can be enabled on Connect's REST API, and this proposal addresses how Connect deals with secrets in stored connector configurations by integrating with external secret management systems. First, since no single standard exists, Connect will provide an extension point for adding customized integrations and will provide a simple file-based extension as an example. Second, a Connect runtime can be configured to use one or more of these extensions, and will allow connector configurations to use placeholders that will be resolved by the runtime before passing the complete connector configurations to connectors. Therefore, existing connectors will not see any difference in the configurations that Connect provides to them at startup. And third, Connect's API will be changed to allow a connector to obtain the latest connector configuration at any time for secrets from all of these systems to be injected into Kafka Connect configurations, and allow the customer to specify the means of injection through a plugin.

Public Interfaces

Two new interfaces will be available in added to Kafka Connect.  These interfaces allow for custom ConfigProviders to be specified for Kafka Connect's public API to allow custom implementations to integrate with external systems for managing secrets.

Code Block
languagejava
public interface ConfigProvider extends Configurable, Closeable {
     
    // Initialize the provider
    void startinit(ConfigContext ctx);
 
    // Transform the configs by resolving all indirect references
    Map<String, String> transform(ConfigContext ctx, Map<String, String> configs);
}
 
public interface ConfigContext {
 
    // Get the initialization parameters
    Map<String, String> parameters();
 
    // The name of the connector
    String connectorName();
 
    // Schedule a reload, possibly for secrets rotation
    void scheduleConfigReload(long delayMs);
}

...

Code Block
languagejava
public interface SinkTaskContext {
    ...
    default Map<String, String> config() {
        ...
    }
    ...
}
 
public interface SourceTaskContext {
    ...
    default Map<String, String> config() {
        ...
    }
    ...
}

 

The following configuration properties will be added.

Config OptionDescriptionExampleDefault
config.providersA comma-separated list of names for providers.config.providers=file,vaultN/A
config.providers.{name}.classThe Java class name for a provider.config.providers.file.class=org.apache.kafka.connect.configs.FileConfigProviderN/A
config.providers.{name}.param.{param-name}A parameter to be passed to the above Java class on initialization.config.providers.file.param.secrets=/run/mysecretsN/A
config.reload.action
One of:
  • "none" - no action when scheduleConfigReload() is called
  • "restart" - schedule a restart when scheduleConfigReload() is called
config.reload.action=restartrestart


Proposed Changes

Currently the configuration for both Connectors and Tasks is stored in a Kafka topic.  The goal is for these stored configurations to only contain indirect references to secrets.  When a Connector or Task is started, the configuration will be read from Kafka and then passed to the specific Connector or Task.  Before the configuration is passed to the Connector or Task, the indirect references need to be resolved. 

...

Code Block
languagetext
# Properties specified in the Worker config
config.providers=vault   # can have multiple comma-separated values
config.provider.vault.class=com.org.apache.connect.configs.VaultConfigProvider
config.provider.vault.param.uri=1.2.3.4
config.provider.vault.param.token=/run/secrets/vault-token
 
# Properties specified in the Connector config
mysql.db.password=${vault:vault_db_password_key}

In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance.    When resolving the value for "mysql.db.password", the VaultConfigProvider will use the key "vault_db_password_key".  The VaultConfigProvider would use this key to look up the corresponding secret.  (VaultConfigProvider is a hypothetical example for illustration purposes only.)

Secret Rotation

Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.   

...

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call scheduleConfigReload() to inform the Herder.
  • Herder:  The herder Herder can push information to the Connector indicating that secrets have expired or may expire in the future.  When the Herder receives the scheduleConfigReload() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none.  The methods SinkTaskContext.config() and SourceTaskContext.config()would be used by the Task to reload the config and resolve indirect references again.

...

No changes are required for existing Connectors.  Existing connector configurations with plaintext passwords will not be affected, and only after they are changed to use the variables (aka, indirect references) will the secrets not be stored by Connect

Connectors that use a ConfigProvider and do not want the restart behavior can specify config.reload.action as none.

...