Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under DiscussionAccepted

Discussion thread: here

JIRA: here [Change the link from KAFKA-1 to your own ticket]13510

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When starting a connector, users must provide the connector configuration. The configuration often also includes configurations for other plugins such as SMTs or converters. Today, Connect does not provide a way to see what plugins are available installed apart from connectors. This make it difficult for users building data pipeline to know which plugins are available and what is possible. Basically they have to know how the Connect runtime is set up. Even once they know the plugins that are available, they then have to go look at the plugins documentation or, in the worst case, look directly at the source code to find their configuration definitions.
Similarly, Kafka connect administrators want to know the plugins that are installed on each worker. This includes both REST Extensions and Config Providers plugins.
All plugins
Connector plugins should be discoverable via the REST API. The Their configuration definitions of connector-level plugins should also be easily retrieved. This would significantly ease the process of building pipelines and enable building tools and UIs that can manage Connect data pipelines.

Public Interfaces

  • GET /connector-plugins: This endpoint will be updated to allow listing all plugins. The response structure of the objects in the array remain unchanged. A new query parameter "connectorsOnly" will be added and it will default to true so it's fully compatible with the current behavior. Users will be able to list all Connectors, Transformations, Converters, HeaderConverters and Predicates plugins by setting it to false. Classes that implement multiple plugin types will appear once for each type. For example SimpleHeaderConverter will be listed as a converter and as a header_converter. Possible values for the "type" field are "sink", "source", "converter", "header_converter", "transformation" and "predicate".

...

Code Block
languagejs
[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "3.02.0"
  },
   {
    "class": "org.apache.kafka.connect.convertersfile.ByteArrayConverterFileStreamSourceConnector",
    "type": "convertersource",
      "version": "3.02.0"  
  },
   {
    "class": "org.apache.kafka.connect.transformsconverters.Cast$ValueByteArrayConverter",
    "type": "transformationconverter"
  },
    "version {
    "class": "3.0.0org.apache.kafka.connect.transforms.Cast$Value",
    "type": "transformation"
  },
  {
    "class": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
    "type": "predicate",
    "version": "3.0.0"  
  },
  {
    "class": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
    "type": "header_converter",
    "version": "3.0.0"
  },
  {
    "class": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
    "type": "converter",
    "version": "3.0.0"
  },   
  ...
]

Currently only Connector plugins are versioned, so we won't include the version field for other plugins.

  • GET /connector-plugins/<plugin>/config: This new endpoint will return the configuration definitions of the specified plugin. It will work with all plugins returned by /connector_plugins.

The plugin can be specified via its fully qualified class name or its Connect alias like in the existing /connector-plugins/<plugin>/config/validate endpoint. If a plugin does not override the config() method, the response is an empty array.


For example, accessing http://localhost:8083/connector-plugins/org.apache.kafka.connect.transforms.Cast$Value/config will return:

Code Block
languagejs
[
  {
    "name": "spec",
    "type": "LIST",
    "required": true,
    "default_value": null,
    "importance": "HIGH",
    "documentation": "List of fields and the type to cast them to of the form field1:type,field2:type to cast fields of Maps or Structs. A single type to cast the entire value. Valid types are int8, int16, int32, int64, float32, float64, boolean, and string. Note that binary fields can only be cast to string.",
    "group": null,
    "width": "NONE",
    "display_name": "spec",
    "dependents": [],
    "order": -1
  }
]
  • GET /worker-plugins: This new endpoint will list RestExtension and ConfigProvider plugins installed in the worker. Possible types are rest_extension and config_provider.
Code Block
languagejs
[
  {
    "class": "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension",
    "type": "rest_extension",
    "version": "3.0.0"
  },
  {
    "class": "org.apache.kafka.common.config.provider.DirectoryConfigProvider",
    "type": "config_provider"
  },
  ...
]


Proposed Changes

REST API:

  • A new path will be added to ConnectorPluginsResource to retrieve the plugin configuration definitions

...

  • Listing connector plugin will accept an optional query parameter "connectorsOnly"  that defaults to true
Code Block
languagejava
@GET
@Path("/")
public List<ConnectorPluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) {}
  • A new REST resource named WorkerPluginsResource will be defined with a single endpoints to list worker plugins
Code Block
languagejava
@Path("/worker-plugins")
public class WorkerPluginsResource {
    @GET
    @Path("/")
    public List<WorkerPluginInfo> listWorkerPlugins() {}
}

Versioned interface:

Add a default implementation for version():

Code Block
languagejava
/**
 * Connect requires some components implement this interface to define a version string.
 */
public interface Versioned {

    String UNDEFINED_VERSION = "undefined";
    /**
     * Get the version of this component.
     *
     * @return the version, formatted as a String. The version may not be {@code null} or empty.
     */
    default String version() {
        return UNDEFINED_VERSION;
    }
}


Converter interface:

Add a config() method to Converter with a default implementation. Also make it extend Versioned.

Code Block
languagejava
public interface Converter extends Versioned {

[...]

    /**
     * Configuration specification for this set of converters.
     * @return the configuration specification; may not be null
     */
    default ConfigDef config() {
        return new ConfigDef();
    }
}

It's common for custom converters to implement both Converter and HeaderConverter. As the 2 methods to retrieve the ConfigDef will have exactly the same signature, it will still be possible to implement both interfaces.HeaderConverter, Transformation and Predicate interface:

Update these 3 interfaces to extend Versioned.

All Converter, HeaderConverter, Transformation and Predicate built-in classes:

All the built-in implementations will override version() to return the current Kafka version.

Compatibility, Deprecation, and Migration Plan

  • /connector-plugins keeps its current behavior and will only expose the new behavior when a new query parameter is set.
  • When accessing /connector-plugins/<plugin>/config on existing converters that don't implement the config() method, an empty array will be returned. If a converter is also implementing HeaderConverter, and hence already have a config() method, it will be automatically used and the config will be returned.When accessing
  • /connector-plugins, existing plugins that don't implement version(), will have the version set to undefined.The other changes are new endpoints that don/<plugin>/config is a new endpoint that doesn't cause compatibility issues.

I propose to flip the query parameter value to list all plugins by default in the next major release.

Rejected Alternatives

  • Add a new endpoint /plugins for listing all plugins: It would be confusing to list both worker and connector plugins together. We'd then end up with 3 endpoints, /plugins, /worker-plugins and /connector-plugins which is as confusing!
  • Group connectors by type when listing them: This would break compatibility with the existing /connector-plugins behavior. As it's a very commonly used endpoint, it's preferred to keep compatibility.
  • Add a new endpoint /worker-plugins to list worker plugins (Rest Extensions and Config Providers): The use case is to allow administrators to check the plugins installed in each worker. Connect shouldn't expose worker internal details to all users and it's not clear what information would be useful for admins. Also Connect already has a /admin endpoint which should be reused for admin tasks.
  • Make all plugins implement Versioned. Initially we wanted to make all plugins consistent, but this either force having a default implementation for version() which would allow Connectors to not implement it, or force introducing another interface (PossiblyVersioned) to version other plugins which did not make a lot of sense since version does not have any contract today.