Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update interface to return list of ConfigValue instead of throwing exception. Update list of configs for Principal policy.

...

The new interface will be treated as a new connect plugin and will be loaded via the plugin path mechanism. The plugins will be discovered via the Service loader mechanism similar to RestExtension and ConfigProvider.  The structure of the new interface and its request are described below:-

Code Block
languagejava
import org.apache.kafka.common.config.ConfigValue;

/**
 * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
 *
 * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
 * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
 */
public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {


    /**
     * Worker will invoke this while constructing the producer for the SourceConnectors,  DLQ for SinkConnectors and the consumer for the
     * SinkConnectors to validate if all of the overridden client configurations are allowed per the
     * policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
     *
     * If there are any policy violations, the connector will not be started.
     *
     * @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and
     *                                     its context; never {@code null}
     * @return @throws PolicyViolationException if any of the overridden property doesn't meet the defined policyList of Config, each Config should indicate if they are allowed via {@link ConfigValue#errorMessages}
     */
    voidList<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest) throws PolicyViolationException;
}


Code Block
languagejava
public class ConnectorClientConfigRequest {

    private Map<String, Object> clientProps;
    private ClientType  clientType;
    private String connectorName;
    private ConnectorType connectorType;
    private Class<? extends Connector> connectorClass;

    public ConnectorClientConfigRequest(
        String connectorName,
        ConnectorType connectorType,
        Class<? extends Connector> connectorClass,
        Map<String, Object> clientProps,
        ClientType clientType) {
        this.clientProps = clientProps;
        this.clientType = clientType;
        this.connectorName = connectorName;
        this.connectorType = connectorType;
        this.connectorClass = connectorClass;
    }

    /**
     * <pre>
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}.
     * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ.
     * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
     * </pre>
     *
     * @return The client properties specified in the Connector Config with prefix {@code producer.override.} ,
     * {@code consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes.
     */
    public Map<String, Object> clientProps() {
        return clientProps;
    }

    /**
     * <pre>
     * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}
     * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
     * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
     * {@link ClientType#ADMIN} for DLQ  Topic Creation in {@link ConnectorType#SINK}
     * </pre>
     *
     * @return enumeration specifying the client type that is being overriden by the worker; never null.
     */
    public ClientType clientType() {
        return clientType;
    }

    /**
     * Name of the connector specified in the connector config.
     *
     * @return name of the connector; never null.
     */
    public String connectorName() {
        return connectorName;
    }

    /**
     * Type of the Connector.
     *
     * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
     */
    public ConnectorType connectorType() {
        return connectorType;
    }

    /**
     * The class of the Connector.
     *
     * @return the class of the Connector being created; never null
     */
    public Class<? extends Connector> connectorClass() {
        return connectorClass;
    }

    public enum ClientType {
        PRODUCER, CONSUMER, ADMIN;
    }
}

...

Class NameAliasBehavior

NoneConnectorClientConfigOverridePolicy

NoneDisallows any configuration overrides. This will be the default policy.

PrincipalConnectorClientConfigOverridePolicy

Principal

Allows override of  "security.protocol", "sasl.jaas.config" and "sasl.mechanism" for the producer, consumer and admin prefixes.  Enables the ability to use different principal per connector.

AllConnectorClientConfigOverridePolicy

AllAllows override of all configurations for the producer, consumer and admin prefixes. 

...

The policy itself will be enforced when a user attempts to either create the connector or validate the connector. When there is a PolicyViolationExceptionany of the ConfigValue has an error message

  • During validate, the response will include error and the specific configurations  that failed to meet the policy will also include the exception error message included in the response
  • During create/update connector, the connector will fail to start

...