You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateUnder discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect allows integration with many types of external systems.  Some of these systems may require secrets to be configured in order to access them.  Many customers have an existing Secret Management strategy and are using centralized management systems such as Vault, Keywhiz, or AWS Secrets Manager.  Vault is very popular and has been described as "the current gold standard in secret management and provisioning". These Secret Management systems may satisfy the following customer requirements:

  • No secret in cleartext at rest (such as on disk) or in transit (over the network)
  • Secrets protected by access control mechanisms
  • All access to secrets recorded in an audit log
  • Support for secret versioning and rolling
  • A common set of APIs for both applications and tools to access secrets
  • Redundancy in case of failover so that secrets are always available
  • Certification as conformant with required compliance standards

Other customers may be passing secrets into the host through various means (such as through Docker secrets), but do not want the secret to appear in cleartext in the Kafka Connect configuration. 

There is a need for secrets from all of these systems to be injected into Kafka Connect configurations, and allow the customer to specify the means of injection through a plugin.

Public Interfaces

Two new interfaces will be available in Kafka Connect.  These interfaces allow for custom ConfigProviders to be specified for Kafka Connect.

public interface ConfigProvider extends Configurable, Closeable {
     
    // Initialize the provider
    void start(ConfigContext ctx);
 
    // Transform the configs by resolving all indirect references
    Map<String, String> transform(ConfigContext ctx, Map<String, String> configs);
}
 
public interface ConfigContext {
 
    // Get the initialization parameters
    Map<String, String> parameters();
 
    // The name of the connector
    String connectorName();
 
    // Schedule a reload, possibly for secrets rotation
    void scheduleConfigReload(long delayMs);
}

 

Two existing interfaces will be modified.  This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).

public interface SinkTaskContext {
    ...
    default Map<String, String> config() {
        ...
    }
    ...
}
 
public interface SourceTaskContext {
    ...
    default Map<String, String> config() {
        ...
    }
    ...
}


Proposed Changes

Currently the configuration for both Connectors and Tasks is stored in a Kafka topic.  The goal is for these stored configurations to only contain indirect references to secrets.  When a Connector or Task is started, the configuration will be read from Kafka and then passed to the specific Connector or Task.  Before the configuration is passed to the Connector or Task, the indirect references need to be resolved. 

The following are required in a design:

  • Ability to specify one or more custom ConfigProviders that will resolve indirect references for configuration values.
  • Ability to pass data to initialize a ConfigProvider on construction or instantiation.
  • For indirect references, a special syntax using the dollar sign ($) will be used to indicate when a configuration value is an indirect reference and for which ConfigProvider(s).

Example:

# Properties specified in the Worker config
config.providers=vault   # can have multiple comma-separated values
config.provider.vault.class=VaultConfigProvider
config.provider.vault.param.uri=1.2.3.4
config.provider.vault.param.token=/run/secrets/vault-token
 
# Properties specified in the Connector config
mysql.db.password=${vault:vault_db_password_key}

In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance.    When resolving the value for "mysql.db.password", the VaultConfigProvider will use the key "vault_db_password_key".  The VaultConfigProvider would use this key to look up the corresponding secret.\


Secret Rotation

Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.   

In general, secret rotation is orthogonal to a particular Connector.  For example, a JDBC password may be stored in a Docker secret or in Vault.  The JDBC connector does not need to know what the method of rotation is.  Also, it is best if the JDBC connector is informed when it should re-obtain secrets rather than wait until a security exception occurs.  So in this case, a push model is warranted.

Other connectors such as the S3 connector are tightly coupled with a particular secret manager, and may wish to handle rotation on their own.  

To handle the different scenarios, the design offers support both a push model and a pull model for obtaining new secrets.  

Different Connect components may have different responsibilities in handling secret rotation:

  • ConfigProvider:  The ConfigProvider may have knowledge of the method of rotation.  For Vault, it would be a "lease duration".  For a file-based provider, it could be file watches.  If it knows when a secret is going to be reloaded, it would call scheduleConfigReload() to inform the Herder.
  • Herder:  The herder can push information to the Connector indicating that secrets have expired or may expire in the future.
    • When the Herder receives the scheduleConfigReload() call, it will check a new connector configuration property config.reload.action which can be one of the following:
      1. The value restart, which means to schedule a restart of the Connector and all its Tasks.  This will be the default.
      2. The value none, which means to do nothing.
  • Connector Tasks:  A task may wish to handle rotation on its own (a pull model).  In this case the Connector would need to set config.reload.action to none if scheduleConfigReload is called.  Also, methods SinkTaskContext.config() and SourceTaskContext.config() will be added to ask the framework to reload the config and resolve indirect references again

 

ConfigProvider Example:  Vault

Vault has three different items that can be rotated:

  1. Key rotation:  Vault uses a keyring, so even if you rotate the keys, new data willl be encrypted by the new key, but old data will still be decryptable with the old keys.  So rotating Vault keys does not have an impact on clients.
  2. Token rotation:  A Vault token is used for authentication and may expire.  In this case the VautConfigProvider will be aware of the lease duration of the token, and will need to request a new token before the old one expires.  In this way token expiration is transparent to the Connector and is a concern of the ConfigProvider only.
  3. Secret rotation:  A secret stored in Vault may be rotated, such as a JDBC or Elasticsearch password.  In this case, the VaultConfigProvider will inform the Herder via a scheduleConfigReload() call.  The Herder in turn will check config.reload.action to determine if either
    1. Whether a restart of all Connector and all its Tasks should be scheduled (the default).
    2. Do nothing (in which case the Tasks themselves will have to deal with the rotation).

Connector Example: Elasticsearch

The Elasticsearch Connector is an example of a Connector that uses a username and password to secure access to the external system.  So in this case the actual storage of the secret is orthogonal to the Connector (unlike perhaps the S3 connector where the secret storage is coupled to AWS).  The Elasticsearch Connector also has stateful components (the ES/Jest client) that are created by using these secrets, similar to other Connectors such as the JDBC Connector (which has stateful connection pools).  Here are the options for how to handle rotation of the username and/or password:

  1. Allow the Herder to schedule a restart of the ES Connector and its Tasks for when the secrets expire.
  2. Do nothing when informed by the ConfigProvider of expiring secrets.  In this case authentication exceptions would occur at some point.  Then the Connector would need to dynamically reconfigure itself by pulling the new configuration, or require manual intervention. 


Compatibility, Deprecation, and Migration Plan

No changes are required for existing Connectors.  Connectors that use a ConfigProvider and do not want the restart behavior can specify config.reload.action as none.

Rejected Alternatives

The current scope of this proposal is for Connectors only.  It does not address brokers nor clients.   The injection will happen at a very specific time in the lifecycle of Kafka Connect, i.e. after the configuration is stored and before the Connectors and Tasks are started.

A related feature for brokers is KIP-226, which allows for dynamic broker configuration.  It can also store passwords.  However,

  1. It currently does not work for Kafka Connect.
  2. One requirement is to not "leak" secrets to other systems, especially if the customer is already using a centralized Secret Management system.

A related feature for clients is KIP-76, which is for obtaining passwords through scripts.  However,

  1. It is not yet implemented
  2. It only applies to certain password fields.
  3. It does not allow for custom plugins.
  • No labels