Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public class ConnectorClientConfigRequest {

    private Map<String, Object> clientProps;
    private ClientType  clientType;
    private String connectorName;
    private ConnectorType connectorType;
    private Class<? extends Connector> connectorClass;

    public ConnectorClientConfigRequest(
        String connectorName,
        ConnectorType connectorType,
        Class<? extends Connector> connectorClass,
        Map<String, Object> clientProps,
        ClientType clientType) {
        this.clientProps = clientProps;
        this.clientType = clientType;
        this.connectorName = connectorName;
        this.connectorType = connectorType;
        this.connectorClass = connectorClass;
    }

    /**
     * <pre>
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}.
     * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ.
     * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
     * </pre>
     *
     * @return The client properties specified in the Connector Config with prefix {@code producer.override.} ,
     * {@code consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes.
     */
    public Map<String, Object> clientProps() {
        return clientProps;
    }

    /**
     * <pre>
     * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}
     * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
     * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
     * {@link ClientType#ADMIN} for DLQ  Topic Creation in {@link ConnectorType#SINK}
     * </pre>
     *
     * @return enumeration specifying the client type that is being overriden by the worker; never null.
     */
    public ClientType clientType() {
        return clientType;
    }

    /**
     * Name of the connector specified in the connector config.
     *
     * @return name of the connector; never null.
     */
    public String connectorName() {
        return connectorName;
    }

    /**
     * Type of the Connector.
     *
     * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
     */
    public ConnectorType connectorType() {
        return connectorType;
    }

    /**
     * The class of the Connector.
     *
     * @return the class of the Connector being created; never null
     */
    public Class<? extends Connector> connectorClass() {
        return connectorClass;
    }

    public enum ClientType {
        PRODUCER, CONSUMER, ADMIN;
    }
}

The KIP introduces the following implementations of ConnectorClientConfigOverridePolicy that are outlined in the table below

Class NameAliasBehavior
IgnoreConnectorClientConfigOverridePolicyIgnoreIgnores any overrides specified in the connector configuration ( current behavior). (This will be accomplished by the implementation returning `false` for `useOverrides()`). This will be the default behavior.

NoneConnectorClientConfigOverridePolicy

NoneDisallows any configuration overrides

PrincipalConnectorClientConfigOverridePolicy

Principal

Allows override of  "sasl.jaas.config" for the producer, consumer and admin prefixes.  Enables the ability to use different principal per connector.

AllConnectorClientConfigOverridePolicy

AllAllows override of all configurations for the producer, consumer and admin prefixes. 

...

The worker would apply the policy during a create connector flow as follows if a policy is configured and useOverrides() returns true. The configurations that are being overridden will be passed without the prefixes:-

  • Constructing producer for WorkerSourceTask - invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Source  & override if no policy violation 
  • Constructing admin client & producer for DeadLetterQueueReporter for the DLQ topic 
    • invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Sink  & override if no policy violation 
    • invoke validate with all configs with "admin.override." prefix , ClientType=Admin, ConnectorType=Sink  & override if no policy violation 
  • Constructing consumer for WorkerSinkTask - invoke validate with all configs with "consumer.override." prefix , ClientType=Consumer, ConnectorType=Sink  & override if no policy violation 

The herder(AbstractHerder) will apply the policy for all overrides as follows if the policy is configured and useOverrides() returns true during the validate() flow. The configurations that are being overridden will be passed without the prefixes:-

  • If its a source connector, apply the policy on each of the connector configurations with "producer." prefix and update the ConfigInfos result ( response of the validate API)
  • If its a sink connector,
    • apply the policy on each of the connector configurations with "consumer." prefix and update the ConfigInfos result ( response of the validate API)
    • apply the policy on each of the connector configurations with "admin." prefix and update the ConfigInfos result  when DLQ is enabled( response of the validate API)

...