Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As connectors interact with external systems, they sometimes need to provision external resources. We can think of imagine a Sink connector that creates creating new queues in the a Messaging System it wants to write messages to; an Object Store Sink connector that creates new buckets to store its messages; before writing messages to it, or a Source connector that activates an account before requesting data from a source system, among other use cases. A more concrete example (and one that concerns us in particular) is a source connector that audits database changes by creating an "audit" table and sets up database triggers which inserts the to insert row-level updates to that table. In many cases, creating these resources is an idempotent operation. In other cases, the targeted system can be queried beforehand, and resources are provisioned only if they not exist.

The connector void start(Map<String, String> props) method is invoked when the connector is first started, or when it is restarted by the Worker (e.g. when the connector configuration changes). Similarly, the void stop() method is invoked when the connector is stopped by the Worker (e.g.  (for example, when the connector configuration changes).

ese are just a small and triggers  and  and   creating new queues on the Some connectors perform  a connector starts, it can perform ome connectors  creating new connectors, the 've some custom connectors that require provisioning external resources 

(think of creating queues, S3 buckets, or activating accounts) when the 

connector instance is created, but also need to cleanup these resources 

(delete, deactivate) when the connector instance is deleted.

  entry points for Connector implementations ,he a new connector is created, or an the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

There are cases where we might want to cleanup these resources when the connector that uses them is deleted. It can be to save costs (the external system charges per active account), or compute resources (triggers writing to an audit table that will no longer be read from should be removed, and so does the audit table). Taking other actions, like deleting the connector offsets (as discussed on KIP-875) can also be considered as part of this cleanup process.

Kafka Connect does not have an API that is called when connectors are deleted, so it is currently not possible to react to the event of a deleted connector. The existing  stop() method cannot be used for this purpose, as a Connector can be restarted later on. 

Public Interfaces


Code Block
languagejava
titleorg.apache.kafka.connect.connector.Connector
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.connector;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.components.Versioned;

import java.util.List;
import java.util.Map;

/**
 * <p>
 * Connectors manage integration of Kafka Connect with another system, either as an input that ingests
 * data into Kafka or an output that passes data to an external system. Implementations should
 * not use this class directly; they should inherit from {@link org.apache.kafka.connect.source.SourceConnector SourceConnector}
 * or {@link org.apache.kafka.connect.sink.SinkConnector SinkConnector}.
 * </p>
 * <p>
 * Connectors have two primary tasks. First, given some configuration, they are responsible for
 * creating configurations for a set of {@link Task}s that split up the data processing. For
 * example, a database Connector might create Tasks by dividing the set of tables evenly among
 * tasks. Second, they are responsible for monitoring inputs for changes that require
 * reconfiguration and notifying the Kafka Connect runtime via the {@link ConnectorContext}. Continuing the
 * previous example, the connector might periodically check for new tables and notify Kafka Connect of
 * additions and deletions. Kafka Connect will then request new configurations and update the running
 * Tasks.
 * </p>
 */
public abstract class Connector implements Versioned {

    protected ConnectorContext context;


    /**
     * Initialize this connector, using the provided ConnectorContext to notify the runtime of
     * input configuration changes.
     * @param ctx context object used to interact with the Kafka Connect runtime
     */
    public void initialize(ConnectorContext ctx) {
        context = ctx;
    }

    /**
     * <p>
     * Initialize this connector, using the provided ConnectorContext to notify the runtime of
     * input configuration changes and using the provided set of Task configurations.
     * This version is only used to recover from failures.
     * </p>
     * <p>
     * The default implementation ignores the provided Task configurations. During recovery, Kafka Connect will request
     * an updated set of configurations and update the running Tasks appropriately. However, Connectors should
     * implement special handling of this case if it will avoid unnecessary changes to running Tasks.
     * </p>
     *
     * @param ctx context object used to interact with the Kafka Connect runtime
     * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
     *                    churn in partition to task assignments
     */
    public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
        context = ctx;
        // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
        // are very different, but reduces the difficulty of implementing a Connector
    }

    /**
     * Returns the context object used to interact with the Kafka Connect runtime.
     *
     * @return the context for this Connector.
     */
    protected ConnectorContext context() {
        return context;
    }

    /**
     * Start this Connector. This method will only be called on a clean Connector, i.e. it has
     * either just been instantiated and initialized or {@link #stop()} has been invoked.
     *
     * @param props configuration settings
     */
    public abstract void start(Map<String, String> props);

    /**
     * Reconfigure this Connector. Most implementations will not override this, using the default
     * implementation that calls {@link #stop()} followed by {@link #start(Map)}.
     * Implementations only need to override this if they want to handle this process more
     * efficiently, e.g. without shutting down network connections to the external system.
     *
     * @param props new configuration settings
     */
    public void reconfigure(Map<String, String> props) {
        stop();
        start(props);
    }

    /**
     * Returns the Task implementation for this Connector.
     */
    public abstract Class<? extends Task> taskClass();

    /**
     * Returns a set of configurations for Tasks based on the current configuration,
     * producing at most count configurations.
     *
     * @param maxTasks maximum number of configurations to generate
     * @return configurations for Tasks
     */
    public abstract List<Map<String, String>> taskConfigs(int maxTasks);

    /**
     * Stop this connector.
     */
    public abstract void stop();

    /**
     * Callback invoked when the connector is deleted, so connectors can perform any cleanup tasks as
     * they are removed.
     */
    public void deleted() {

    }

    /**
     * Validate the connector configuration values against configuration definitions.
     * @param connectorConfigs the provided configuration values
     * @return List of Config, each Config contains the updated configuration information given
     * the current configuration values.
     */
    public Config validate(Map<String, String> connectorConfigs) {
        ConfigDef configDef = config();
        if (null == configDef) {
            throw new ConnectException(
                String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
            );
        }
        List<ConfigValue> configValues = configDef.validate(connectorConfigs);
        return new Config(configValues);
    }

    /**
     * Define the configuration for the connector.
     * @return The ConfigDef for this connector; may not be null.
     */
    public abstract ConfigDef config();
}

Proposed Changes

Add a new void deleted() method to the Connector public API, with a default (empty) implementation. This method can then be overwritten by connectors that need to take any additional steps (remove assets, delete offsets, etc) before the connector is completely removed from the cluster.

Standalone mode

When the connector is running in standalone mode (StandaloneHerder.java), DELETE requests are handled synchronously in the same thread. Therefore, the void deleted() method will be executed as part of the request.

Distributed mode

DELETE requests for connectors running in distributed mode (DistributedHerder.java) are handled asynchronously. When deleted, the connector configuration is removed from the backing store, and these changes are processed  chan

This method will be called by the connect Worker offsete Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

...

The proposed change is fully backward-compatible with existing Kafka Connect releases. The new method added to the public interface includes an empty (noop) implementation, so existing connectors don't need to implement it if not needed.

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

...