...
Code Block | ||
---|---|---|
| ||
public class ConnectorClientConfigRequest { private Map<String, Object> clientProps; private ClientType clientType; private String connectorName; private ConnectorType connectorType; private Class<? extends Connector> connectorClass; public ConnectorClientConfigRequest( String connectorName, ConnectorType connectorType, Class<? extends Connector> connectorClass, Map<String, Object> clientProps, ClientType clientType) { this.clientProps = clientProps; this.clientType = clientType; this.connectorName = connectorName; this.connectorType = connectorType; this.connectorClass = connectorClass; } /** * <pre> * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}. * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}. * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ. * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ. * </pre> * * @return The client properties specified in the Connector Config with prefix {@code producer.override.} , * {@code consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes. */ public Map<String, Object> clientProps() { return clientProps; } /** * <pre> * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE} * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK} * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK} * {@link ClientType#ADMIN} for DLQ Topic Creation in {@link ConnectorType#SINK} * </pre> * * @return enumeration specifying the client type that is being overriden by the worker; never null. */ public ClientType clientType() { return clientType; } /** * Name of the connector specified in the connector config. * * @return name of the connector; never null. */ public String connectorName() { return connectorName; } /** * Type of the Connector. * * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}. */ public ConnectorType connectorType() { return connectorType; } /** * The class of the Connector. * * @return the class of the Connector being created; never null */ public Class<? extends Connector> connectorClass() { return connectorClass; } public enum ClientType { PRODUCER, CONSUMER, ADMIN; } } |
The KIP introduces the following implementations of ConnectorClientConfigOverridePolicy that are outlined in the table below
Class Name | Alias | Behavior |
---|---|---|
IgnoreConnectorClientConfigOverridePolicy | Ignore | Ignores any overrides specified in the connector configuration ( current behavior). (This will be accomplished by the implementation returning `false` for `useOverrides()`). This will be the default behavior. |
NoneConnectorClientConfigOverridePolicy | None | Disallows any configuration overrides |
PrincipalConnectorClientConfigOverridePolicy | Principal | Allows override of "sasl.jaas.config" for the producer, consumer and admin prefixes. Enables the ability to use different principal per connector. |
AllConnectorClientConfigOverridePolicy | All | Allows override of all configurations for the producer, consumer and admin prefixes. |
...
The worker would apply the policy during a create connector flow as follows if a policy is configured and useOverrides() returns true. The configurations that are being overridden will be passed without the prefixes:-
- Constructing producer for WorkerSourceTask - invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Source & override if no policy violation
- Constructing admin client & producer for DeadLetterQueueReporter for the DLQ topic
- invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Sink & override if no policy violation
- invoke validate with all configs with "admin.override." prefix , ClientType=Admin, ConnectorType=Sink & override if no policy violation
- Constructing consumer for WorkerSinkTask - invoke validate with all configs with "consumer.override." prefix , ClientType=Consumer, ConnectorType=Sink & override if no policy violation
The herder(AbstractHerder) will apply the policy for all overrides as follows if the policy is configured and useOverrides() returns true during the validate() flow. The configurations that are being overridden will be passed without the prefixes:-
- If its a source connector, apply the policy on each of the connector configurations with "producer." prefix and update the ConfigInfos result ( response of the validate API)
- If its a sink connector,
- apply the policy on each of the connector configurations with "consumer." prefix and update the ConfigInfos result ( response of the validate API)
- apply the policy on each of the connector configurations with "admin." prefix and update the ConfigInfos result when DLQ is enabled( response of the validate API)
...