You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »


Status

Current stateUnder Discussion

Discussion thread: here 

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KAFKA-2798 (0.9.0.0) introduced the ability for each source connector and sink connector to inherit their client configurations from the worker properties. Within the worker properties, any configuration that has a prefix of "producer." or "consumer." are applied to all source connectors and sink connectors respectively. While the original proposal allowed overrides for source and sink connectors, it is still restrictive in terms of allowing different configurations for connectors. Typically, connect users would want to be able to do the following:-

  • Use different principal for each connector so that they could control ACL at a fine-grained level
  • Ability to optimize the producer and the consumer configurations for each connector so that connectors are set up for their performance characteristic

KIP-296: Connector level configurability for client configs aimed to solve this by allowing all configurations to be allowed to be overridden. But the KIP doesn't provide the ability for the connect operators to control what the connectors can override. Without this ability, there will be no clear line of separation between the connector and worker since the connector itself can now assume that the overrides would be available. But from an operational perspective, it will be good to have the following enforced

  • Ability to control the config keys that can be overridden. For e.g. an administrator might never want the broker endpoint to be overridden
  • Ability to control the allowed values for configs that are overridden. This helps with administrators defining the bounds of their clusters and manage multi-tenant cluster efficiently For e.g. the administrator might never want the `send.buffer.bytes` to above say 512 kb
  • Ability to control the above based on connector types, client type (admin vs producer vs consumer), etc. 

Based on the above context, the proposal is to allow an administrator to define/implement policy around what can be overridden very similar to the  `CreateTopicPolicy` which allows and controls the configurations to be specified at the topic level.

Public Interfaces

On a high level, the proposal is to introduce a configurable policy similar to the CreateTopicPolicy available in the Core Kafka for Connector client Config Overrides. More specifically, we will introduce a new worker configuration that will allow an administrator to configure the policy for Connector Client config overrides. 


New configuration

connector.client.config.policy - This will be an implementation of a new interface ConnectorClientConfigPolicy that will be introduced in the connect API. The default value will be null when not configured will simply ignore the provided overrides and hence the behavior continues to be backward compatible.

The overrides can be specified in the connector config by using the following prefixes

  • `producer.override.` - Used for source connector's producer  & DLQ producer in the context of SinkConnector
  • `consumer.override.` - Used for Sink Connector
  • `admin.override.` - Used for DLQ topic create in Sink Connector ( The KIP will also allow DLQ settings to be specified in the worker using `admin` prefix to be consistent with producer & consumer)

The administrator could either specify the fully qualified class name of the ConnectorClientConfigPolicy implementation or an alias (the alias is computed to be the prefix on the interface name `ConnectorClientConfigPolicy` which is exactly how most of the existing connect plugins compute their alias).

The new interface will be treated as a new connect plugin and will be loaded via the plugin path mechanism. The plugins will be discovered via the Service loader mechanism similar to RestExtension and ConfigProvider.  The structure of the new interface and its request are described below 

/**
 * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
 *
 * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
 * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
 */
public interface ConnectorClientConfigPolicy extends Configurable, AutoCloseable {

    /**
     * Worker will invoke this while constructing the producer for the SourceConnectors,  DLQ for SinkConnectors and the consumer for the
     * SinkConnectors to validate if all of the overridden client configurations are allowed per the
     * policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
     * 
     * If there are any policy violations, the connector will move to a <code>FAILED</code> state.
     *
     * @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and 
     *                                     its context; never {@code null}
     * @throws PolicyViolationException if any of the overridden property doesn't meet the defined policy
     */
    void validate(ConnectorClientConfigRequest connectorClientConfigRequest) throws PolicyViolationException;
}
public class ConnectorClientConfigRequest {

    private Map<String, Object> clientProps;
    private ClientType  clientType;
    private String connectorName;
    private ConnectorType connectorType;
    private Class connectorClass;

    public ConnectorClientConfigRequest(
        String connectorName,
        ConnectorType connectorType,
        Class connectorClass,
        Map<String, Object> clientProps,
        ClientType clientType) {
        this.clientProps = clientProps;
        this.clientType = clientType;
        this.connectorName = connectorName;
        this.connectorType = connectorType;
        this.connectorClass = connectorClass;
    }

    public Map<String, Object> clientProps() {
        return clientProps;
    }

    public ClientType clientType() {
        return clientType;
    }

    public String connectorName() {
        return connectorName;
    }

    public ConnectorType connectorType() {
        return connectorType;
    }

    public Class connectorClass() {
        return connectorClass;
    }

    public enum ClientType {
        PRODUCER, CONSUMER, ADMIN;
    }
}

Inaddition to the default implementation, the KIP also proposes to a few more implementations of ConnectorClientConfigPolicy that are outlined in the table below

Class NameAliasBehavior

NoneConnectorClientConfigPolicy

NoneDisallows any configuration overrides

PrincipalConnectorClientConfigPolicy

Principal

Allows override of  "sasl.jaas.config" for the producer, consumer and admin prefixes.  Enables the ability to use different principal per connector.

AllConnectorClientConfigPolicy

AllAllows override of all configurations for the producer, consumer and admin prefixes. 

Since the users can specify any of these policies, the connectors itself should not rely on these configurations to be available. The overrides are to be used purely from an operational perspective.

The policy itself will be enforced when a user attempts to either create the connector or validate the connector.

Proposed Changes

As specified in the previous section, the design will include introducing a new worker configuration and an interface to define the override policy.

The worker would apply the policy during a create connector flow as follows if a policy is configured:-

  • Constructing producer for WorkerSourceTask - invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Source  & override if no policy violation 
  • Constructing admin client & producer for DeadLetterQueueReporter for the DLQ topic 
    • invoke validate with all configs with "producer.override." prefix , ClientType=Producer, ConnectorType=Sink  & override if no policy violation 
    • invoke validate with all configs with "admin.override." prefix , ClientType=Admin, ConnectorType=Sink  & override if no policy violation 
  • Constructing consumer for WorkerSinkTask - invoke validate with all configs with "consumer.override." prefix , ClientType=Consumer, ConnectorType=Sink  & override if no policy violation 

The herder(AbstractHrder) will apply the policy for all overrides as follows if the policy is configured:-

  • If its a source connector, apply the policy on each of the connector configurations with "producer." prefix and update the ConfigInfos result ( response of the validate API)
  • If its a sink connector,
    • apply the policy on each of the connector configurations with "consumer." prefix and update the ConfigInfos result ( response of the validate API)
    • apply the policy on each of the connector configurations with "admin." prefix and update the ConfigInfos result  when DLQ is enabled( response of the validate API)

Compatibility, Deprecation, and Migration Plan

  • The KIP is backward compatible since the overridden configurations are ignored by default which is the current behavior.

Rejected Alternatives

  • Override all configurations passed in the connector with the prefix 'producer.' or 'consumer.' - This doesn't provide control to the cluster administrator on what is an acceptable override.
  • Override just the "sasl.jaas.config" from the connector - This is very restrictive in terms of what it can achieve
  • Running multiple herders in the Connect cluster - This will reduce the ease of operation of a connect cluster since each connector would require a Herder to spun up within the cluster.
  • No labels