
Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The base class for all Source and Sink connectors is the org.apache.kafka.connect.connector.Connector abstract class. This class defines a few abstract methods (e.g. void start(Map<String, String> props) and void stop()) that are then implemented by Connectors and are invoked as part of their lifecycle.

As connectors interact with external systems, they sometimes need to provision external resources. We can imagine a Sink connector creating new queues in a Messaging System before writing messages to it, or a Source connector that activates an account before requesting data from a source system, among other use cases. A more concrete example (and one that concerns us in particular) is a source connector that audits database changes by creating an "audit" table and sets up database triggers to insert row-level updates to that table.

There are cases where we might want to cleanup these resources when the connector that uses them is deleted. It can be to save costs (the external system charges per active account), or compute resources (triggers writing to an audit table that will no longer be read from should be removed, and so does the audit table). Taking other actions, like deleting the connector offsets (as discussed on KIP-875) can also be considered as part of this cleanup process.

Kafka Connect does not have an API that is called when connectors are deleted, so it is currently not possible to react to the event of a deleted connector. The existing  stop() method cannot be used for this purpose, as a Connector can be restarted later on. 

Public Interfaces

 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.kafka.connect.connector;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.components.Versioned;

import java.util.List;
import java.util.Map;

 * <p>
 * Connectors manage integration of Kafka Connect with another system, either as an input that ingests
 * data into Kafka or an output that passes data to an external system. Implementations should
 * not use this class directly; they should inherit from {@link org.apache.kafka.connect.source.SourceConnector SourceConnector}
 * or {@link org.apache.kafka.connect.sink.SinkConnector SinkConnector}.
 * </p>
 * <p>
 * Connectors have two primary tasks. First, given some configuration, they are responsible for
 * creating configurations for a set of {@link Task}s that split up the data processing. For
 * example, a database Connector might create Tasks by dividing the set of tables evenly among
 * tasks. Second, they are responsible for monitoring inputs for changes that require
 * reconfiguration and notifying the Kafka Connect runtime via the {@link ConnectorContext}. Continuing the
 * previous example, the connector might periodically check for new tables and notify Kafka Connect of
 * additions and deletions. Kafka Connect will then request new configurations and update the running
 * Tasks.
 * </p>
public abstract class Connector implements Versioned {

    protected ConnectorContext context;

     * Initialize this connector, using the provided ConnectorContext to notify the runtime of
     * input configuration changes.
     * @param ctx context object used to interact with the Kafka Connect runtime
    public void initialize(ConnectorContext ctx) {
        context = ctx;

     * <p>
     * Initialize this connector, using the provided ConnectorContext to notify the runtime of
     * input configuration changes and using the provided set of Task configurations.
     * This version is only used to recover from failures.
     * </p>
     * <p>
     * The default implementation ignores the provided Task configurations. During recovery, Kafka Connect will request
     * an updated set of configurations and update the running Tasks appropriately. However, Connectors should
     * implement special handling of this case if it will avoid unnecessary changes to running Tasks.
     * </p>
     * @param ctx context object used to interact with the Kafka Connect runtime
     * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
     *                    churn in partition to task assignments
    public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
        context = ctx;
        // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
        // are very different, but reduces the difficulty of implementing a Connector

     * Returns the context object used to interact with the Kafka Connect runtime.
     * @return the context for this Connector.
    protected ConnectorContext context() {
        return context;

     * Start this Connector. This method will only be called on a clean Connector, i.e. it has
     * either just been instantiated and initialized or {@link #stop()} has been invoked.
     * @param props configuration settings
    public abstract void start(Map<String, String> props);

     * Reconfigure this Connector. Most implementations will not override this, using the default
     * implementation that calls {@link #stop()} followed by {@link #start(Map)}.
     * Implementations only need to override this if they want to handle this process more
     * efficiently, e.g. without shutting down network connections to the external system.
     * @param props new configuration settings
    public void reconfigure(Map<String, String> props) {

     * Returns the Task implementation for this Connector.
    public abstract Class<? extends Task> taskClass();

     * Returns a set of configurations for Tasks based on the current configuration,
     * producing at most count configurations.
     * @param maxTasks maximum number of configurations to generate
     * @return configurations for Tasks
    public abstract List<Map<String, String>> taskConfigs(int maxTasks);

     * Stop this connector.
    public abstract void stop();

     * Callback invoked when the connector is deleted, so connectors can perform any cleanup tasks as
     * they are removed.
    public void deleted() {


     * Validate the connector configuration values against configuration definitions.
     * @param connectorConfigs the provided configuration values
     * @return List of Config, each Config contains the updated configuration information given
     * the current configuration values.
    public Config validate(Map<String, String> connectorConfigs) {
        ConfigDef configDef = config();
        if (null == configDef) {
            throw new ConnectException(
                String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
        List<ConfigValue> configValues = configDef.validate(connectorConfigs);
        return new Config(configValues);

     * Define the configuration for the connector.
     * @return The ConfigDef for this connector; may not be null.
    public abstract ConfigDef config();

Proposed Changes

Add a new void deleted() method to the Connector public API, with a default (empty) implementation. This method can then be overwritten by connectors that need to take any additional steps (remove assets, delete offsets, etc) before the connector is completely removed from the cluster.

Standalone mode

When the connector is running in standalone mode (, DELETE requests are handled synchronously in the same thread. Therefore, the void deleted() method will be executed as part of the request.

Distributed mode

DELETE requests for connectors running in distributed mode ( are handled asynchronously. When deleted, the connector configuration is removed from the backing store, and these changes are processed on the Herder's main poll thread.

Worker class

In both cases, herders invoke methods from the Worker class to start/stop the connector instance. What this KIP proposes is to overload the  Worker#stopAndAwaitConnector(...) method, passing a flag indicating that the connector is being stopped because it has been deleted, and have the worker call the new  deleted() method on the connector.

Compatibility, Deprecation, and Migration Plan

The proposed change is fully backward-compatible with existing Kafka Connect releases. The new method added to the public interface includes an empty (noop) implementation, so existing connectors don't need to implement it if not needed.

Test Plan

Integration tests will be added to make sure that the new method is invoked when a connector is deleted. Mainly:

Rejected Alternatives

In theory, users can monitor Kakfa Connect configuration topics to determine if/when a connector has been deleted. Reacting to this event outside of the connector's context is probably not very useful, as there might not be enough contextual information to perform any meaningful action. It is also better to keep these concerns encapsulated within the connector framework itself.