Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

connector.client.config.override.policy - This will be an implementation of a new interface ConnectorClientConfigOverridePolicy that will be introduced in the connect API. The default value will be `Ignore` `None` which will allow the implementation to be backward compatible

The overrides can be specified in the connector config by using the following prefixes

not allow any overrides. Since the possibility of users already having config with the proposed prefixes is very slim, backward compatibility is generally not a problem. In the very rare case where users have these in their existing configs, they would have to just remove the configs to get it working again.

The overrides can be specified in the connector config by using the following prefixes

  • `producer.override.` - Used for source `producer.override.` - Used for source connector's producer  & DLQ producer in the context of SinkConnector
  • `consumer.override.` - Used for Sink Connector
  • `admin.override.` - Used for DLQ topic create in Sink Connector ( The KIP will also allow DLQ settings to be specified in the worker using `admin` prefix to be consistent with producer & consumer)

...

Code Block
languagejava
/**
 * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
 *
 * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
 * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
 */
public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {


    /**
     * SpecifiesWorker ifwill theinvoke workerthis should attempt to overridewhile constructing the clientproducer configsfor from the connector.
SourceConnectors,  DLQ for SinkConnectors *and Implementationthe aren'tconsumer requiredfor tothe
 override this method as its* includedSinkConnectors to preservevalidate backwardsif compatibilityall inof the default
overridden client configurations are allowed *per implementation.the
     *
 policy implementation. This would *also @returnbe ainvoked booleanduring indicatingthe whethervalidate theof workerconnector shouldconfigs attemptvia tothe overrideRest theAPI.
 client configs from the connector*
     */
 If there are defaultany boolean useOverrides() {
        return true;
    }

    /*policy violations, the connector will not be started.
     *
     * This@param methodconnectorClientConfigRequest willan beinstance invokedof when {@link@code ConnectorClientConfigPolicy#useOverrides()ConnectorClientConfigRequest} returnsthat true.
provides the configs to overridden and
     *   Worker will invoke this while constructing the producer for the SourceConnectors,  DLQ for SinkConnectors and the consumer for the
     * SinkConnectors to validate if all of the overridden client configurationsits arecontext; allowednever per{@code thenull}
     * policy@throws implementation.PolicyViolationException Thisif would also be invoked duringany of the validateoverridden ofproperty connectordoesn't configsmeet via the Restdefined API.policy
     */
    void * If there are any policy violations, the connector will not be started.validate(ConnectorClientConfigRequest connectorClientConfigRequest) throws PolicyViolationException;
}


Code Block
languagejava
public class ConnectorClientConfigRequest {

    private Map<String, Object> clientProps;
    private *
ClientType  clientType;
   * @paramprivate connectorClientConfigRequestString anconnectorName;
 instance of {@code ConnectorClientConfigRequest}private thatConnectorType providesconnectorType;
 the configs to overridden and
  private Class<? extends Connector> connectorClass;

   * public ConnectorClientConfigRequest(
        String connectorName,
        ConnectorType connectorType,
        Class<? extends Connector> connectorClass,
      its context; neverMap<String, {@code null}Object> clientProps,
     * @throws PolicyViolationException ifClientType anyclientType) of{
 the overridden property doesn't meet the defined policy
this.clientProps = clientProps;
   */
    void validate(ConnectorClientConfigRequest connectorClientConfigRequest) throws PolicyViolationException;
}
Code Block
languagejava
public class ConnectorClientConfigRequest {

 this.clientType = clientType;
       private Map<String, Object> clientPropsthis.connectorName = connectorName;
    private ClientType  clientType;
  this.connectorType = private String connectorNameconnectorType;
    private ConnectorType connectorType;
  this.connectorClass = privateconnectorClass;
 Class<? extends Connector> connectorClass;}

    public ConnectorClientConfigRequest(/**
     * <pre>
  String connectorName,
  * Provides Config with prefix  ConnectorType connectorType,
  {@code producer.override.} for {@link ConnectorType#SOURCE}.
     * Class<?Provides extendsConfig Connector>with connectorClass,
prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
   Map<String, Object> clientProps,
* Provides Config with prefix {@code producer.override.} for {@link ClientTypeConnectorType#SINK} clientType)for {DLQ.
     * Provides Config this.clientPropswith = clientProps;
        this.clientType = clientType;prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
     * </pre>
  this.connectorName  = connectorName;*
     * @return The this.connectorTypeclient =properties connectorType;
specified in the Connector Config with prefix {@code thisproducer.override.connectorClass = connectorClass;} ,
    }

 * {@code  /**
     * <pre>consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes.
     */
 Provides Config with prefixpublic {@code producer.override.} for {@link ConnectorType#SOURCE}.Map<String, Object> clientProps() {
     * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}. return clientProps;
    }

    /**
     * <pre>
  Provides Config with prefix* {@code producer.override.@link ClientType#PRODUCER} for {@link ConnectorType#SINKConnectorType#SOURCE} for DLQ.
     * Provides{@link Config with prefix {@code admin.override.} forClientType#CONSUMER} for {@link ConnectorType#SINK}
     * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
     * {@link ClientType#ADMIN} for DLQ.  Topic Creation in {@link ConnectorType#SINK}
     * </pre>
     *
     * @return Theenumeration client properties specified in the Connector Config with prefix {@code producer.override.} ,specifying the client type that is being overriden by the worker; never null.
     */
 {@code consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes.
     */public ClientType clientType() {
        return clientType;
    public}

 Map<String, Object> clientProps() { /**
     * Name of the returnconnector clientProps;
specified in the connector }config.

     /**
     * <pre>
@return name of the  * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}connector; never null.
     */
    public * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
String connectorName() {
      * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
return connectorName;
    }

     /**
 {@link ClientType#ADMIN} for DLQ * TopicType Creationof in {@link ConnectorType#SINK}
     * </pre>the Connector.
     *
     * @return enumeration specifying the client type thatof isthe beingconnector overriden{@link byConnectorType#SINK} theor worker; never null{@link ConnectorType#SOURCE}.
     */
    public ClientTypeConnectorType clientTypeconnectorType() {
        return clientTypeconnectorType;
    }

    /**
     * The Nameclass of the connector specified in the connector configConnector.
     *
     * @return namethe class of the Connector connectorbeing created; never null.
     */
    public String connectorName Class<? extends Connector> connectorClass() {
        return connectorNameconnectorClass;
    }

    /**
public enum ClientType {
  * Type of the Connector.
   PRODUCER, CONSUMER, *
     * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
     */
    public ConnectorType connectorType() {
        return connectorType;
    }

    /**
     * The class of the Connector.
     *
     * @return the class of the Connector being created; never null
     */
    public Class<? extends Connector> connectorClass() {
        return connectorClass;
    }

    public enum ClientType {
        PRODUCER, CONSUMER, ADMIN;
    }
}
ADMIN;
    }
}

The KIP introduces the The KIP introduces the following implementations of ConnectorClientConfigOverridePolicy that are outlined in the table below

Ignores any overrides specified in the connector configuration ( current behavior). (This will be accomplished by the implementation returning `false` for `useOverrides()`). This will be the default behavior.
Class NameAliasBehaviorIgnoreConnectorClientConfigOverridePolicyIgnoreBehavior

NoneConnectorClientConfigOverridePolicy

NoneDisallows any configuration overrides. This will be the default policy.

PrincipalConnectorClientConfigOverridePolicy

Principal

Allows override of  "sasl.jaas.config" for the producer, consumer and admin prefixes.  Enables the ability to use different principal per connector.

AllConnectorClientConfigOverridePolicy

AllAllows override of all configurations for the producer, consumer and admin prefixes. 

...

The worker would apply the policy during a create connector flow as follows if a policy is configured and useOverrides() returns true. The configurations that are being overridden will be passed without the prefixes to the policy:-

  • Constructing producer for WorkerSourceTask - invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Source  & override if no policy violation 
  • Constructing admin client & producer for DeadLetterQueueReporter for the DLQ topic 
    • invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Sink  & override if no policy violation 
    • invoke validate with all configs with "admin.override." prefix , ClientType=Admin, ConnectorType=Sink  & override if no policy violation 
  • Constructing consumer for WorkerSinkTask - invoke validate with all configs with "consumer.override." prefix , ClientType=Consumer, ConnectorType=Sink  & override if no policy violation 

The herder(AbstractHerder) will apply the policy for all overrides as follows if the policy is configured and useOverrides() returns true during the validate() flow. The configurations that are being overridden will be passed without the prefixes:-

  • If its a source connector, apply the policy on each of the connector configurations with "producer." prefix and update the ConfigInfos result ( response of the validate API)
  • If its a sink connector,
    • apply the policy on each of the connector configurations with "consumer." prefix and update the ConfigInfos result ( response of the validate API)
    • apply the policy on each of the connector configurations with "admin." prefix and update the ConfigInfos result  when DLQ is enabled( response of the validate API)

Compatibility, Deprecation, and Migration Plan, Deprecation, and Migration Plan

  • The possibility of someone having connectors with the proposed prefixes is very slim and hence backward compatibility is not really a problem. In the rare case, if a user has configurations with these prefixes, they would either have to remove the config or alter the policy to get it workingThe KIP is backward compatible since the overridden configurations are ignored by default which is the current behavior.

Rejected Alternatives

  • Override all configurations passed in the connector with the prefix 'producer.' or 'consumer.' - This doesn't provide control to the cluster administrator on what is an acceptable override.
  • Override just the "sasl.jaas.config" from the connector - This is very restrictive in terms of what it can achieve
  • Running multiple herders in the Connect cluster - This will reduce the ease of operation of a connect cluster since each connector would require a Herder to spun up within the cluster.