Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current stateUnder DiscussionAccepted

Discussion thread: here 

Vote Discussion thread: here 

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8265

Release: AK 2.3.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

New configuration

connector.client.config.override.policy - This will be an implementation of a new interface ConnectorClientConfigPolicy ConnectorClientConfigOverridePolicy that will be introduced in the connect API. The default value will be an implementation `IgnoreConnectorClientConfigPolicy` and will simply ignore the provided overrides and hence the behavior continues to be backward compatible.`None` which will not allow any overrides. Since the possibility of users already having config with the proposed prefixes is very slim, backward compatibility is generally not a problem. In the very rare case where users have these in their existing configs, they would have to just remove the configs to get it working again.

The overrides can be specified in the connector config by using the following prefixes

  • `producer.override.` - Used for source connector's producer  & DLQ producer in the context of SinkConnector
  • `consumer.override.` - Used for Sink Connector
  • `admin.override.` - Used for DLQ topic create in Sink Connector ( The KIP will also allow DLQ settings to be specified in the worker using `admin` prefix to be consistent with producer & consumer)

The administrator could either specify the fully qualified class name of the ConnectorClientConfigPolicy the ConnectorClientConfigOverridePolicy implementation or an alias (the alias is computed to be the prefix  prefix on the interface name `ConnectorClientConfigPolicy` `ConnectorClientConfigOverridePolicy` which is exactly how most of the existing connect plugins compute their alias).

The new interface will be treated as a new connect plugin and will be loaded via the plugin path mechanism. The plugins will be discovered via the Service loader mechanism similar to RestExtension and ConfigProvider.  The structure of the new interface and its request are described below below:-

Code Block
languagejava
import org.apache.kafka.common.config.ConfigValue;

/**
 * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
 *
 * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
 * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
 */
public interface ConnectorClientConfigPolicyConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {


    /**
     * Worker will invoke this while constructing the producer for the SourceConnectors,  DLQ for SinkConnectors and the consumer for the
     * SinkConnectors to validate if all of the overridden client configurations are allowed per the
     * policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
     * 
     * If there are any policy violations, the connector will movenot to a <code>FAILED</code> statebe started.
     *
     * @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and 
     *                                     its context; never {@code null}
     * @throws PolicyViolationException if any of the overridden property doesn't meet the defined policy@return List of Config, each Config should indicate if they are allowed via {@link ConfigValue#errorMessages}
     */
    voidList<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest) throws PolicyViolationException;
}


Code Block
languagejava
public class ConnectorClientConfigRequest {

    private Map<String, Object> clientProps;
    private ClientType  clientType;
    private String connectorName;
    private ConnectorType connectorType;
    private ClassClass<? extends Connector> connectorClass;

    public ConnectorClientConfigRequest(
        String connectorName,
        ConnectorType connectorType,
        Class<? extends ClassConnector> connectorClass,
        Map<String, Object> clientProps,
        ClientType clientType) {
        this.clientProps = clientProps;
        this.clientType = clientType;
        this.connectorName = connectorName;
        this.connectorType = connectorType;
        this.connectorClass = connectorClass;
    }

    /**
     * <pre>
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}.
     * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ.
     * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
     * </pre>
     *
     * @return The client properties specified in the Connector Config with prefix {@code producer.override.} ,
     * {@code consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes.
     */
    public Map<String, Object> clientProps() {
        return clientProps;
    }

    /**
     * <pre>
     * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}
     * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
     * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
     * {@link ClientType#ADMIN} for DLQ  Topic Creation in {@link ConnectorType#SINK}
     * </pre>
     *
     * @return enumeration specifying the client type that is being overriden by the worker; never null.
     */
    public ClientType clientType() {
        return clientType;
    }

    /**
     * Name of the connector specified in the connector config.
     *
     * @return name of the connector; never null.
     */
    public String connectorName() {
        return connectorName;
    }

    /**
     * Type of the Connector.
     *
     * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
     */
    public ConnectorType connectorType() {
        return connectorType;
    }

    public Class/**
     * The class of the Connector.
     *
     * @return the class of the Connector being created; never null
     */
    public Class<? extends Connector> connectorClass() {
        return connectorClass;
    }

    public enum ClientType {
        PRODUCER, CONSUMER, ADMIN;
    }
}

Inaddition to the default implementation, the KIP also proposes to a few more implementations of ConnectorClientConfigPolicy The KIP introduces the following implementations of ConnectorClientConfigOverridePolicy that are outlined in the table below

Class NameAliasBehavior

NoneConnectorClientConfigPolicyNoneConnectorClientConfigOverridePolicy

NoneDisallows any configuration overrides. This will be the default policy.

PrincipalConnectorClientConfigOverridePolicyPrincipalConnectorClientConfigPolicy

Principal

Allows override of  "security.protocol", "sasl.jaas.config" and "sasl.mechanism" for the producer, consumer and admin prefixes.  Enables the ability to use different principal per connector.

AllConnectorClientConfigPolicyAllConnectorClientConfigOverridePolicy

AllAllows override of all configurations for the producer, consumer and admin prefixes. 

...

The policy itself will be enforced when a user attempts to either create the connector or validate the connector. When any of the ConfigValue has an error message

  • During validate, the response will include error and the specific configurations  that failed to meet the policy will also include the error message included in the response
  • During create/update connector, the connector will fail to start

Proposed Changes

As specified in the previous section, the design will include introducing a new worker configuration and an interface to define the override policy.

The worker would apply the policy during a create connector flow as follows if . The configurations that are being overridden will be passed without the prefixes to the policy is not `IgnoreConnectorClientConfigPolicy`:-

  • Constructing producer for WorkerSourceTask - invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Source  & override if no policy violation 
  • Constructing admin client & producer for DeadLetterQueueReporter for the DLQ topic 
    • invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Sink  & override if no policy violation 
    • invoke validate with all configs with "admin.override." prefix , ClientType=Admin, ConnectorType=Sink  & override if no policy violation 
  • Constructing consumer for WorkerSinkTask - invoke validate with all configs with "consumer.override." prefix , ClientType=Consumer, ConnectorType=Sink  & override if no policy violation 

The herder(AbstractHrderAbstractHerder) will apply the policy for all overrides as follows if the policy is not `IgnoreConnectorClientConfigPolicy`during the validate() flow. The configurations that are being overridden will be passed without the prefixes:-

  • If its a source connector, apply the policy on each of the connector configurations with "producer." prefix and update the ConfigInfos result ( response of the validate API)
  • If its a sink connector,
    • apply the policy on each of the connector configurations with "consumer." prefix and update the ConfigInfos result ( response of the validate API)
    • apply the policy on each of the connector configurations with "admin." prefix and update the ConfigInfos result  when DLQ is enabled( response of the validate API)

Compatibility, Deprecation, and Migration Plan

  • The KIP is backward compatible since the overridden configurations are ignored by default which is the current behaviorpossibility of someone having connectors with the proposed prefixes is very slim and hence backward compatibility is not really a problem. In the rare case, if a user has configurations with these prefixes, they would either have to remove the config or alter the policy to get it working.

Rejected Alternatives

  • Override all configurations passed in the connector with the prefix 'producer.' or 'consumer.' - This doesn't provide control to the cluster administrator on what is an acceptable override.
  • Override just the "sasl.jaas.config" from the connector - This is very restrictive in terms of what it can achieve
  • Running multiple herders in the Connect cluster - This will reduce the ease of operation of a connect cluster since each connector would require a Herder to spun up within the cluster.