Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The base class for all Source and Sink connectors is the org.apache.kafka.connect.connector.Connector abstract class. This class defines a few abstract methods (e.g. void start(Map<String, String> props) and void stop()) that are then implemented by Connectors and are invoked as part of their lifecycle.

As connectors interact with external systems, they sometimes need to provision external resources. We can imagine a Sink connector that creates new queues in a Messaging System before writing messages to itthem, or a Source connector that activates an account before requesting data from sending requests to a source system, among other use cases. A more concrete example (and one that concerns us in particular) is a source connector that audits database changes by creating an "audit" table and setting sets up database triggers to insert row-level updates to that table.

There are cases where we might want to cleanup these resources when the connector that uses them is deleted. It can be to save costs (ie.eg. the external system charges per active account), or compute resources (triggers writing database updates to an audit table that will no longer be read from should be removed, and so does the audit table). Taking other actions, like deleting the connector offsets (as discussed on KIP-875) might also be considered as part of this cleanup process.

The Connector API in Kafka Connect currently does not provide an API that is invoked when connectors are any hooks to indicate when a connector has been deleted, so it is currently not possible to react to this event. The stop() method in its current form cannot be used for this purpose in its current shape, as a Connector can be stopped and restarted at any point (ie.eg. when its configuration changes). 

Public Interfaces


Code Block
languagejava
titleorg.apache.kafka.connect.connector.Connector
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.connector;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.components.Versioned;

import java.util.List;
import java.util.Map;

/**
 * <p>
 * Connectors manage integration of Kafka Connect with another system, either as an input that ingests
 * data into Kafka or an output that passes data to an external system. Implementations should
 * not use this class directly; they should inherit from {@link org.apache.kafka.connect.source.SourceConnector SourceConnector}
 * or {@link org.apache.kafka.connect.sink.SinkConnector SinkConnector}.
 * </p>
 * <p>
 * Connectors have two primary tasks. First, given some configuration, they are responsible for
 * creating configurations for a set of {@link Task}s that split up the data processing. For
 * example, a database Connector might create Tasks by dividing the set of tables evenly among
 * tasks. Second, they are responsible for monitoring inputs for changes that require
 * reconfiguration and notifying the Kafka Connect runtime via the {@link ConnectorContext}. Continuing the
 * previous example, the connector might periodically check for new tables and notify Kafka Connect of
 * additions and deletions. Kafka Connect will then request new configurations and update the running
 * Tasks.
 * </p>
 */
public abstract class Connector implements Versioned {

    protected ConnectorContext context;


    /**
     * InitializeStop this connector, using the provided ConnectorContext to notify the runtime of
     * input configuration changes.
     * @param ctx context object used to interact with the Kafka Connect runtime
     */
    public abstract void initializestop(ConnectorContext ctx) {
        context = ctx;
      }

    /**
     * <p>
     * Initialize Stop this connector, usingand thealso provided ConnectorContext to notifyindicate if the runtimeconnector of
has     * input configuration changes and using the provided set of Task configurationsbeen deleted.
     * This version is only used to recover from failures.
     * </p>
     * <p>
     * The default implementation ignores the provided Task configurations. During recovery, Kafka Connect will request
     * an updated set of configurations and update the running Tasks appropriately. However, Connectors should
     * implement special handling of this case if it will avoid unnecessary changes to running Tasks.
     * </p>
     *
     * @param ctx context object used to interact with the Kafka Connect runtime
     * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
     *                    churn in partition to task assignmentsConnectors are not required to override this method, unless they need to perform some cleanup
     */
    public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
        context = ctx;
        // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
        // are very different, but reduces the difficulty of implementing a Connector
    }

    /**
     * Returns the context object used to interact with the Kafka Connect runtime.
     *
     * @return the context for this Connector.
     */
    protected ConnectorContext context() {
        return context;
    }

    /**
     * Start this Connector. This method will only be called on a clean Connector, i.e. it has
     * either just been instantiated and initialized or {@link #stop()} has been invoked actions in cases where the connector has been deleted.
     *
     * @param propsisDeleted configuration settings
     */
    public abstract void start(Map<String, String> props);

    /**
     * Reconfigure this Connector. Most implementations will not override this, using the default
     * implementation that calls {@link #stop()} followed by {@link #start(Map)}indicates if the connector has been deleted.
     * Implementations only need to override this if they want to handle this process more
     * efficiently, e.g. without shutting down network connections to the external system.
     *
     * @param props new configuration settings
     */
    public void reconfigurestop(Map<String, String> propsboolean isDeleted) {
        stop();
        start(props);
    }

    /**
     * Returns the Task implementation for this Connector.
     */
    public abstract Class<? extends Task> taskClass();

    /**
     * Returns a set of configurations for Tasks based on the current configuration,
     * producing at most count configurations.
     *
     * @param maxTasks maximum number of configurations to generate
     * @return configurations for Tasks
     */
    public abstract List<Map<String, String>> taskConfigs(int maxTasks);

    /**
     * Stop this connector.
     */
    public abstract void stop();

    /**
     * Callback invoked when the connector is deleted. This gives connectors a chance to perform
     * any cleanup tasks as they are removed.
     */
    public void destroyed() {
    }

    /**
     * Validate the connector configuration values against configuration definitions.
     * @param connectorConfigs the provided configuration values
     * @return List of Config, each Config contains the updated configuration information given
     * the current configuration values.
     */
    public Config validate(Map<String, String> connectorConfigs) {
        ConfigDef configDef = config();
        if (null == configDef) {
            throw new ConnectException(
                String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
            );
        }
        List<ConfigValue> configValues = configDef.validate(connectorConfigs);
        return new Config(configValues);
    }

    /**
     * Define the configuration for the connector.
     * @return The ConfigDef for this connector; may not be null.
     */
    public abstract ConfigDef config();       
}

Proposed Changes

Add a new void destroyed(an overload to the void stop(boolean isDeleted) method to the Connector public API, with a default implementation that calls the existing void stop(empty) implementation method. This new method can then be overridden by connectors that need to take any additional steps (remove assets, delete offsets, etc) before the connector is completely removed from the cluster.

Standalone mode

When the connector is running in standalone mode (StandaloneHerder.java) , DELETE requests are handled synchronously in the same thread. Therefore, the void destroyed() method will be executed as part of the request.

Distributed mode

DELETE requests for connectors running in distributed mode (DistributedHerder.java) are handled asynchronously. When deleted, the connector configuration is removed from the backing store, and these changes are processed in the Herder's main poll threaddeletion process.

Worker class

In both cases, herders Both StandaloneHerder and DistributedHerder invoke methods from the Worker class to start/stop the connector instance. What this KIP proposes is to This KIP will overload the  Worker#stopAndAwaitConnector(...) method, passing a flag indicating that the connector is being stopped because it has been deleted, and have the worker call the new  destroyed() method on the connector. This flag be passed to the new  Connector#stop(isDeleted) method, so Connectors can implement any additional logic needed.

Compatibility, Deprecation, and Migration Plan

The proposed change is fully backward-compatible with existing Kafka Connect releases. The new method added to the public interface includes an empty (noop) implementationdefault implementation of the new method, so existing connectors don't need to implement override it if not needed.

Test Plan

Integration tests will be added to make sure that the new method is invoked when a connector is deleted. Mainly:

  • Add/update unit tests to WorkerTest and WorkerConnectorTest classes.
  • Add integration tests in ConnectWorkerIntegrationTest Adding new tests to the ConnectWorkerIntegrationTest integration test cases and auxiliary classes (EmbeddedConnectClusterAssertions,SampleSourceConnector,SampleSinkConnector etc.)

Rejected Alternatives

Add new destroy()/onDelete() method to the Connect API

On the initial iteration, a new destroy() method was added to the Connector class. This method was called during the WorkerConnector#doShutdown() execution, and the idea was to call it right after connector.stop().  method was  was delete

In theor


yIn theory, users can monitor Kakfa Connect configuration topics to determine if/when a connector has been deleted. Reacting to this event outside of the connector's context is probably not very useful, as there might not be enough contextual information to perform any meaningful action. It is also better to keep these concerns encapsulated within the connector framework itself.

...