Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When starting a connector, users must provide the connector configuration. The configuration often also includes configurations for other plugins such as SMTs or converters. Today, Connect does not provide a way to see what plugins are available apart from connectors (via the /connector-plugins endpoint). This make it difficult for users building data pipeline to know which plugins are available and what is possible. Basically they have to know how the Connect runtime is set up. Even once they know the plugins that are available, they then have to go look at the plugins documentation or, in the worst case, look directly at the source code to find their configuration definitions.
All
Similarly, Kafka connect administrators want to know the plugins that are directly usable installed on each worker. This includes both REST Extensions and Config Providers plugins.

All plugins should be discoverable via the REST API. Their The configuration definitions of connector-level plugins should also be easily retrieved. This would significantly ease the process of building pipelines and enable building tools and UIs that can manage Connect data pipelines.

Public Interfaces

...

  • /connector-plugins: This endpoint will return all plugins that are be updated to allow listing all plugins. A new query parameter "connectors_only" will be added and it will default to true so it's fully compatible with the current behavior. Users will be able to list all Connectors, Transformations, Converters, HeaderConverters and Predicates . Plugins will be grouped by plugin type.plugins by setting it to false.

For example GET /connector-plugins?connectors_only=false will return:

Code Block
languagejs
{
  "converter": [
    {
      "class": "org.apache.kafka.connect.convertersfile.ByteArrayConverterFileStreamSinkConnector",
      "locationtype": "classpath"
    }
  ],
  "header_converter": [
    {
  sink",
    "classversion": "org3.apache.kafka.connect.storage.SimpleHeaderConverter",
      "location": "classpath"
  0.0"
  }
  ],
  "sink": [
    {
      "class": "org.apache.kafka.connect.fileconverters.FileStreamSinkConnectorByteArrayConverter",
      "locationtype": "classpathconverter"
    }
  ],
  "source": [
    {
      "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
      "location": "classpath"transforms.Cast",
      "versiontype": "3.0.0transformation"
    }
  ],   
  "transformation": [
    {
      "class": "comorg.github.jcustenborderapache.kafka.connect.transformtransforms.commonpredicates.BytesToString$KeyHasHeaderKey",
      "locationtype": "file:/Users/mickael/tmp/path2/kafka-connect-transform-common-0.1.0.14.jarpredicate"
    },
     {
      "class": "org.apache.kafka.connect.transformsstorage.DropHeadersSimpleHeaderConverter",
      "locationtype": "classpathheader_converter"
    }
  ],
  "predicate": [
     {
      "class": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
      "location": "classpath"
    }
  ] 
}


  • /connector-plugins/<type><plugin>/<name>/configdefconfig: This new endpoint will return the configdef for configuration definitions of the specified plugin. Type can be either sink, source, transformation, predicate, converter or header_converter. Name must be a class name (fully qualified or not) or the Connect alias (class name without the Connector, Converter or Transform suffix) of the pluginIt will work with all plugins returned by /connector_plugins.
    For example, accessing http://localhost:8083/connector-plugins/

...

...

Code Block
languagejs
[
  {
    "name": "spec",
    "type": "LIST",
    "required": true,
    "default_value": null,
    "importance": "HIGH",
    "documentation": "List of fields and the type to cast them to of the form field1:type,field2:type to cast fields of Maps or Structs. A single type to cast the entire value. Valid types are int8, int16, int32, int64, float32, float64, boolean, and string. Note that binary fields can only be cast to string.",
    "group": null,
    "width": "NONE",
    "display_name": "spec",
    "dependents": [],
    "order": -1
  }
]


  • /worker-plugins: This

...

  • new endpoint will list RestExtension and ConfigProvider plugins installed in the worker.
Code Block
languagejs
[
  {
    "class": "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension",
    "type": "rest_extension",
    "version": "3.0.0"
  },
  {
    "class": "org.apache.kafka.common.config.provider.DirectoryConfigProvider",
    "type": "config_provider"
  },
  ...
]


Proposed Changes

REST API:

  • A new

...

  • path will be added to ConnectorPluginsResource
Code Block
languagejava
@GET
@Path("/{plugin}/config")
public Map<String, List<PluginInfo>> listPluginsList<ConfigKeyInfo> getPluginConfig() {
    return doListPlugins();
}


  • Listing connector plugin will accept an optional query parameter "connectors_only"  that defaults to true
Code Block
languagejava
@GET
@Path("/{type}/{plugin}/configdef")
public List<ConfigKeyInfo>List<ConnectorPluginInfo> getConnectorConfigDef(final @PathParam("type") String type, final @PathParam("plugin") String plugin) {
    return doGetConfigDef(type, plugin);
listConnectorPlugins(@DefaultValue("true") @QueryParam("connectors_only") boolean connectorsOnly) {
    return getConnectorPlugins(connectorsOnly);
}


  • A new REST resource named WorkerPluginsResource will be defined with a single endpoints to list worker plugins
Code Block
languagejava
@Path("/worker-plugins")
public class WorkerPluginsResource {
    @GET
    @Path("/")
    public List<WorkerPluginInfo> listPlugins() {}}


Converter interface:

Add a config() method to Converter. Also make it Configurable and Closeable so it's uniform with the other plugins:

Code Block
languagejava
public interface Converter extends Configurable, Closeable {

[...]

    /**
     * Configuration specification for this set of converters.
     * @return the configuration specification; may not be null
     */
    default ConfigDef config() {
        return new ConfigDef();
    }
}

It's common for custom converters to implement both Converter and HeaderConverter. As the 2 methods to retrieve the ConfigDef will have exactly the same signature, it will still be possible to implement both interfaces.

Compatibility, Deprecation, and Migration Plan

...

  • /connector-plugins keeps its current behavior and will only expose the new behavior when a new query parameter is set. I propose to flip the query parameter value in the next major release to list all plugins by default.
  • The other changes are new endpoints that don't cause compatibility issues.

Rejected Alternatives

  • Support listing worker Add a new endpoint /plugins for listing all plugins: It 's unclear how useful worker plugins are for users creating connectors. I decided to focus the KIP on user plugins insteadwould be confusing to list both worker and connector plugins together. We'd then end up with 3 endpoints, /plugins, /worker-plugins and /connector-plugins which is as confusing!
  • Group connectors by type when listing them: This would break compatibility with the existing /connector-plugins behavior. As it's a very commonly used endpoint, it's preferred to keep compatibility.