Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP is an extension of KIP-883: Add isDeleted flag when stopping a connector, but and focuses mainly on providing hooks to tasks to perform any additional cleanup  required on providing a flag indicating that a connector has been deleted when the task is stopped due to the connector being deleted. This flag can then be used by the Task implementation to perform any additional cleanup steps if needed. This KIP potentially is related to (and supersedes) KIP-419: Safely notify Kafka Connect SourceTask is stopped.

As connectors interact with external systems, they sometimes need to provision external resources. We can imagine Imagine a Sink connector that creates new queues in a Messaging System before writing messages to them, or a Source connector that activates an account before sending requests to polling a source system, among or any other use cases that requires extra setup. A more concrete example (and one that concerns us in particular) is a source connector that audits database changes by creating an "audit" table and sets setting up database triggers to insert capture database row-level updates to that into the audit table.

There are In cases where like these we might want to cleanup these the resources when the connector that uses provisioned them is deleted. It There can be many reasons why cleanup is desirable: be it to save costs (e.g. the external system charges per active account) , or compute resources (triggers writing database updates to an audit table that will no longer be read from polled should be removed, and so does the audit table), and many others.

Public Interfaces


Code Block
languagejava
titleorg.apache.kafka.connect.connector.Connector
package org.apache.kafka.connect.connector;

public interface Task {
    /**
     * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version.
     *
     * @return the version, formatted as a String
     */
    String version();

    /**
     * Start the Task
     * @param props initial configuration
     */
    void start(Map<String, String> props);

    /**
     * Stop this task.
     */
    void stop();

    /**
     * Stop this task, while also indicating that the task is being stopped because the connector was
     * deleted.
     * <p>
     * Tasks are not required to override this method, unless they need to perform additional cleanup
     * actions in cases where the connector has been deleted.
     *
     * @param connectorDeleted indicates if the connector has been deleted.
     */
    default void stop(boolean connectorDeleted) {
        stop();
    }
}

...

Add an overload to the void stop(boolean connectorDeleted) method to the Task public API, with a default implementation that calls the existing void stop() method. This new method can then be overridden by connectors that need to take any additional steps as part of the deletion process.

...

Herder classes

Both StandaloneHerder and DistributedHerder invoke methods from the Worker class to start/stop the connector instance. This KIP will overload the methods used to stop tasks in the org.apache.kafka.connect.runtime.Worker class, passing a flag indicating that the connector has been deletedtasks.

For the StandaloneHerder, the tasks will be stopped with the connectorDeleted flag set to true as part of the StandaloneHerder#deleteConnectorConfig(...)  method. In the case of the DistributedHerder, the connectorDeleted flag will be computed during the RebalanceListener#onRevoked(...) callback, by checking if the tasks being revoked are for a connector that has been deleted (the connector configuration no longer exists in the ClusterConfigState store).

Compatibility, Deprecation, and Migration Plan

...

Integration tests will be added to make sure that the new method flag is invoked used when stopping a connector is deleted. Mainly:

  • Add/update unit tests to WorkerTest and WorkerConnectorTest classes.
  • Add integration tests in ConnectWorkerIntegrationTest and auxiliary classes (EmbeddedConnectClusterAssertions,SampleSourceConnector,SampleSinkConnector etc.)

Rejected Alternatives

Add new destroy()/onDelete() method to the Connect API

...

task for a deleted connector.

Rejected Alternatives

Delete provisioned resources out-of-band

In theory, users can monitor Kakfa Connect configuration topics to determine if/when a connector has been deleted. Reacting to this event outside of the connector's context is probably not very useful, as there might not be enough contextual information to perform any meaningful action. It is also better to keep There is some value on keeping these concerns encapsulated within the connector framework itself.

...