Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The base class for all Source and Sink connectors is the org.apache.kafka.connect.connector.Connector abstract class. This class defines a few abstract methods (e.g. void start(Map<String, String> props) and void stop()) that are then implemented by Connectors and are invoked as part of their lifecycleThis KIP is an extension of KIP-883: Add isDeleted flag when stopping a connector, but focuses mainly on providing hooks to tasks to perform any additional cleanup  required when the task is stopped due to the connector being deleted. This KIP potentially supersedes KIP-419: Safely notify Kafka Connect SourceTask is stopped.

As connectors interact with external systems, they sometimes need to provision external resources. We can imagine a Sink connector that creates new queues in a Messaging System before writing messages to them, or a Source connector that activates an account before sending requests to a source system, among other use cases. A more concrete example (and one that concerns us in particular) is a source connector that audits database changes by creating an "audit" table and sets up database triggers to insert row-level updates to that table.

There are cases where we might want to cleanup these resources when the connector that uses them is deleted. It can be to save costs (e.g. the external system charges per active account), or compute resources (triggers writing database updates to an audit table that will no longer be read from should be removed, and so does the audit table). Taking other actions, like deleting the connector offsets (as discussed on KIP-875) might also be considered as part of this cleanup process.The Connector API in Kafka Connect does not provide any hooks to indicate when a connector has been deleted, so it is currently not possible to react to this event. The stop() method in its current form cannot be used for this purpose, as a Connector can be stopped and restarted at any point (e.g. when its configuration changes). 

Public Interfaces


Code Block
languagejava
titleorg.apache.kafka.connect.connector.Connector
package org.apache.kafka.connect.connector;

public abstract class Connector implements Versioned {
     interface Task {
    /**
     * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version.
     *
     * @return the version, formatted as a String
     */
    String version();

    /**
     * StopStart thisthe connector.Task
     */ @param props initial configuration
     */
  public abstract void stop();      
start(Map<String, String> props);

    /**
     * Stop this task.
     */
    void stop();

    /**
     * Stop this connectortask, andwhile also indicate if indicating that the task is being stopped because the connector was
   has  been* deleted.
     * <p>
     * ConnectorsTasks are not required to override this method, unless they need to perform someadditional cleanup
     * actions in cases where the connector has been deleted.
     *
     * @param isDeletedconnectorDeleted indicates if the connector has been deleted.
     */
    publicdefault void stop(boolean isDeletedconnectorDeleted) {
        stop();
    }       
}

Proposed Changes

Add an overload to the void stop(boolean isDeletedconnectorDeleted) method to the Connector Task public API, with a default implementation that calls the existing void stop() method. This new method can then be overridden by connectors that need to take any additional steps (remove assets, delete offsets, etc) as part of the deletion process.

...

Both StandaloneHerder and DistributedHerder invoke methods from the Worker class to start/stop the connector instance. This KIP will overload the  Worker#stopAndAwaitConnector(...) methodthe methods used to stop tasks in the org.apache.kafka.connect.runtime.Worker class, passing a flag indicating that the connector has been deleted. This flag be passed to the new  Connector#stop(isDeleted) method, so Connectors can implement any additional logic needed.

Compatibility, Deprecation, and Migration Plan

...