Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is an extension of KIP-883: Add isDeleted flag when stopping a connector, but focuses mainly on providing hooks to tasks to perform any additional cleanup  required when the task is stopped due to the connector being deleted. This KIP potentially supersedes KIP-419: Safely notify Kafka Connect SourceTask is stopped.

...

There are cases where we might want to cleanup these resources when the connector that uses them is deleted. It can be to save costs (e.g. the external system charges per active account), or compute resources (triggers writing database updates to an audit table that will no longer be read from should be removed, and so does the audit table).

Public Interfaces


Code Block
languagejava
titleorg.apache.kafka.connect.connector.Connector
package org.apache.kafka.connect.connector;

public interface Task {
    /**
     * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version.
     *
     * @return the version, formatted as a String
     */
    String version();

    /**
     * Start the Task
     * @param props initial configuration
     */
    void start(Map<String, String> props);

    /**
     * Stop this task.
     */
    void stop();

    /**
     * Stop this task, while also indicating that the task is being stopped because the connector was
     * deleted.
     * <p>
     * Tasks are not required to override this method, unless they need to perform additional cleanup
     * actions in cases where the connector has been deleted.
     *
     * @param connectorDeleted indicates if the connector has been deleted.
     */
    default void stop(boolean connectorDeleted) {
        stop();
    }
}

Proposed Changes

Add an overload to the void stop(boolean connectorDeleted) method to the Task public API, with a default implementation that calls the existing void stop() method. This new method can then be overridden by connectors that need to take any additional steps as part of the deletion process.

Worker class

Both StandaloneHerder and DistributedHerder invoke methods from the Worker class to start/stop the connector instance. This KIP will overload the methods used to stop tasks in the org.apache.kafka.connect.runtime.Worker class, passing a flag indicating that the connector has been deleted.

Compatibility, Deprecation, and Migration Plan

The proposed change is fully backward-compatible with existing Kafka Connect releases. The new method added to the public interface includes an default implementation of the new method, so existing connectors don't need to override it if not needed.

Test Plan

Integration tests will be added to make sure that the new method is invoked when a connector is deleted. Mainly:

  • Add/update unit tests to WorkerTest and WorkerConnectorTest classes.
  • Add integration tests in ConnectWorkerIntegrationTest and auxiliary classes (EmbeddedConnectClusterAssertions,SampleSourceConnector,SampleSinkConnector etc.)

Rejected Alternatives

Add new destroy()/onDelete() method to the Connect API

Initially we thought about adding a new destroy() method to the Connector class. The idea was to call this method on WorkerConnector#doShutdown(), right after the connector.stop() is executed. This however presented some questions around the execution guarantees, for example, what the behavior would be when the Connector#stop() method never returned, or the method throws an exception. To make things simpler, an overloaded Connector#stop(boolean isDeleted) was introduced instead, so the expected behavior remains the same as with the current implementation. That is, the method is guaranteed to be called if the connector stops within CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS (default: 5 secs)

Delete provisioned resources out-of-band

In theory, users can monitor Kakfa Connect configuration topics to determine if/when a connector has been deleted. Reacting to this event outside of the connector's context is probably not very useful, as there might not be enough contextual information to perform any meaningful action. It is also better to keep these concerns encapsulated within the connector framework itself.

...