Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

New configuration

connector.client.config.override.policy - This will be an implementation of a new interface ConnectorClientConfigPolicy ConnectorClientConfigOverridePolicy that will be introduced in the connect API. The default value will be null when not configured will simply ignore the provided overrides and hence the behavior continues `Ignore` which will allow the implementation to be backward compatible.

The overrides can be specified in the connector config by using the following prefixes

...

The administrator could either specify the fully qualified class name of the ConnectorClientConfigPolicy the ConnectorClientConfigOverridePolicy implementation or an alias (the alias is computed to be the prefix on the interface name `ConnectorClientConfigPolicy` `ConnectorClientConfigOverridePolicy` which is exactly how most of the existing connect plugins compute their alias).

The new interface will be treated as a new connect plugin and will be loaded via the plugin path mechanism. The plugins will be discovered via the Service loader mechanism similar to RestExtension and ConfigProvider.  The structure of the new interface and its request are described below below:-

Code Block
languagejava
/**
 * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
 *
 * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
 * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
 */
public interface ConnectorClientConfigPolicy extends Configurable, AutoCloseable {

    /**
     * Specifies if the worker should attempt to override the client configs from the connector.
     * Implementation aren't required to override this method as its included to preserve backwards compatibility in th edefault
     * implementation.
     *
     * @return a boolean indicating whether the worker should attempt to override the client configs from the connector
     */
    default boolean useOverrides() {
        return true;
    }

    /**
     * This method will be invoked when {@link ConnectorClientConfigPolicy#useOverrides()} returns true.
     * Worker will invoke this while constructing the producer for the SourceConnectors,  DLQ for SinkConnectors and the consumer for the
     * SinkConnectors to validate if all of the overridden client configurations are allowed per the
     * policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
     * 
     * If there are any policy violations, the connector will movenot to a <code>FAILED</code> statebe started.
     *
     * @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and 
     *                                     its context; never {@code null}
     * @throws PolicyViolationException if any of the overridden property doesn't meet the defined policy
     */
    void validate(ConnectorClientConfigRequest connectorClientConfigRequest) throws PolicyViolationException;
}

...

Code Block
languagejava
public class ConnectorClientConfigRequest {

    private Map<String, Object> clientProps;
    private ClientType  clientType;
    private String connectorName;
    private ConnectorType connectorType;
    private Class<? extends ClassConnector> connectorClass;

    public ConnectorClientConfigRequest(
        String connectorName,
        ConnectorType connectorType,
        Class<? extends ClassConnector> connectorClass,
        Map<String, Object> clientProps,
        ClientType clientType) {
        this.clientProps = clientProps;
        this.clientType = clientType;
        this.connectorName = connectorName;
        this.connectorType = connectorType;
        this.connectorClass = connectorClass;
    }

    public Map<String, Object> clientProps() {
        return clientProps;
    }

    public ClientType clientType() {
        return clientType;
    }

    public String connectorName() {
        return connectorName;
    }

    public ConnectorType connectorType() {
        return connectorType;
    }

    public ClassClass<? extends Connector> connectorClass() {
        return connectorClass;
    }

    public enum ClientType {
        PRODUCER, CONSUMER, ADMIN;
    }
}

Inaddition to the default implementation, the KIP also proposes to a few more implementations of ConnectorClientConfigPolicy The KIP introduces the following implementations of ConnectorClientConfigOverridePolicy that are outlined in the table below

Class NameAliasBehavior
IgnoreConnectorClientConfigOverridePolicyIgnoreIgnores any overrides specified in the connector configuration ( current behavior). (This will be accomplished by the implementation returning `false` for `useOverrides()`)

NoneConnectorClientConfigOverridePolicyNoneConnectorClientConfigPolicy

NoneDisallows any configuration overrides

PrincipalConnectorClientConfigPolicyPrincipalConnectorClientConfigOverridePolicy

Principal

Allows override of  "sasl.jaas.config" for the producer, consumer and admin prefixes.  Enables the ability to use different principal per connector.

AllConnectorClientConfigPolicyAllConnectorClientConfigOverridePolicy

AllAllows override of all configurations for the producer, consumer and admin prefixes. 

...

The worker would apply the policy during a create connector flow as follows if a policy is configured and useOverrides() returns true:-

  • Constructing producer for WorkerSourceTask - invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Source  & override if no policy violation 
  • Constructing admin client & producer for DeadLetterQueueReporter for the DLQ topic 
    • invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Sink  & override if no policy violation 
    • invoke validate with all configs with "admin.override." prefix , ClientType=Admin, ConnectorType=Sink  & override if no policy violation 
  • Constructing consumer for WorkerSinkTask - invoke validate with all configs with "consumer.override." prefix , ClientType=Consumer, ConnectorType=Sink  & override if no policy violation 

The herder(AbstractHerder) will apply the policy for all overrides as follows if the policy is configured and useOverrides() returns true during the validate() flow:-

...