Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]

JIRA:

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14354

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The base class for all Source and Sink connectors is the org.apache.kafka.connect.connector.Connector abstract class. This class defines a few abstract methods (e.g. void start(Map<String, String> props) and void stop()) that are then implemented by Connectors and are invoked as part of their lifecycle.

As connectors interact with external systems, they sometimes need to provision external resources. We can think of imagine a Sink connector that creates new queues in the a Messaging System it wants to write messages to; an Object Store Sink connector that creates new buckets to store its messages; before writing messages to them, or a Source connector that activates an account before requesting data from sending requests to a source system, among other use cases. A more concrete example (and one that concerns us in particular) is a source connector that audits database changes by creating an "audit" table and sets up database triggers which inserts the to insert row-level updates to that table. In many cases, creating these resources is an idempotent operation. In other cases, the targeted system can be queried beforehand, and resources are provisioned only if they not exist.

There are cases where we might want to cleanup these resources when the connector that uses them is deleted. It can be to save costs (e.g. the external system charges per active account), or compute resources (triggers writing database updates to an audit table that will no longer be read from should be removed, and so does the audit table). Taking other actions, like deleting the connector offsets (as discussed on KIP-875) might also be considered as part of this cleanup process.

The Connector API in Kafka Connect does not provide any hooks to indicate when a connector has been deleted, so it is currently not possible to react to this event. The stop() method in its current form cannot be used for this purpose, as a Connector can be stopped and restarted at any point The connector void start(Map<String, String> props) method is invoked when the connector is first started, or when it is restarted by the Worker (e.g. when the connector its configuration changes). Similarly, the void stop() method is invoked when the connector is stopped by the Worker (e.g.  (for example, when the connector configuration changes).

ese are just a small and triggers  and  and   creating new queues on the Some connectors perform  a connector starts, it can perform ome connectors  creating new connectors, the 've some custom connectors that require provisioning external resources 

(think of creating queues, S3 buckets, or activating accounts) when the 

connector instance is created, but also need to cleanup these resources 

(delete, deactivate) when the connector instance is deleted.

  entry points for Connector implementations ,he a new connector is created, or an the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

Public Interfaces


Code Block
languagejava
titleorg.apache.kafka.connect.connector.Connector
package org.apache.kafka.connect.connector;

public abstract class Connector implements Versioned {
    
    /**
     * Stop this connector.
     */
    public abstract void stop();      

    /**
     * Stop this connector, and also indicate if the connector has been deleted.
     * <p>
     * Connectors are not required to override this method, unless they need to perform some cleanup
     * actions in cases where the connector has been deleted.
     *
     * @param isDeleted indicates if the connector has been deleted.
     */
    public void stop(boolean isDeleted) {
        stop();
    }       
}

Proposed Changes

Add an overload to the void stop(boolean isDeleted) method to the Connector public API, with a default implementation that calls the existing void stop() method. This new method can then be overridden by connectors that need to take any additional steps (remove assets, delete offsets, etc) as part of the deletion process.

Worker class

Both StandaloneHerder and DistributedHerder invoke methods from the Worker class to start/stop the connector instance. This KIP will overload the  Worker#stopAndAwaitConnector(...) method, passing a flag indicating that the connector has been deleted. This flag be passed to the new  Connector#stop(isDeleted) method, so Connectors can implement any additional logic needed.

Compatibility, Deprecation, and Migration Plan

The proposed change is fully backward-compatible with existing Kafka Connect releases. The new method added to the public interface includes an default implementation of the new method, so existing connectors don't need to override it if not needed.

Test Plan

Integration tests will be added to make sure that the new method is invoked when a connector is deleted. Mainly:

  • Add/update unit tests to WorkerTest and WorkerConnectorTest classes.
  • Add integration tests in ConnectWorkerIntegrationTest and auxiliary classes (EmbeddedConnectClusterAssertions,SampleSourceConnector,SampleSinkConnector etc.)

Rejected Alternatives

Add new destroy()/onDelete() method to the Connect API

Initially we thought about adding a new destroy() method to the Connector class. The idea was to call this method on WorkerConnector#doShutdown(), right after the connector.stop() is executed. This however presented some questions around the execution guarantees, for example, what the behavior would be when the Connector#stop() method never returned, or the method throws an exception. To make things simpler, an overloaded Connector#stop(boolean isDeleted) was introduced instead, so the expected behavior remains the same as with the current implementation. That is, the method is guaranteed to be called if the connector stops within CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS (default: 5 secs)

Delete provisioned resources out-of-band

In theory, users can monitor Kakfa Connect configuration topics to determine if/when a connector has been deleted. Reacting to this event outside of the connector's context is probably not very useful, as there might not be enough contextual information to perform any meaningful action. It is also better to keep these concerns encapsulated within the connector framework itselfIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.