Status

Current state: Accepted

Discussion thread: here

JIRA: KAFKA-13510

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When starting a connector, users must provide the connector configuration. The configuration often also includes configurations for other plugins such as SMTs or converters. Today, Connect does not provide a way to see what plugins are installed apart from connectors. This make it difficult for users building data pipeline to know which plugins are available and what is possible. Basically they have to know how the Connect runtime is set up. Even once they know the plugins that are available, they then have to go look at the plugins documentation or, in the worst case, look directly at the source code to find their configuration definitions.

Connector plugins should be discoverable via the REST API. Their configuration definitions should also be easily retrieved. This would significantly ease the process of building pipelines and enable building tools and UIs that can manage Connect data pipelines.

Public Interfaces

For example GET /connector-plugins?connectorsOnly=false will return:

[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "3.2.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "3.2.0"
  },   {
    "class": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "type": "converter"
  },
  {
    "class": "org.apache.kafka.connect.transforms.Cast$Value",
    "type": "transformation"
  },
  {
    "class": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
    "type": "predicate"
  },
  {
    "class": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
    "type": "header_converter"
  },
  {
    "class": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
    "type": "converter"
  },   
  ...
]

Currently only Connector plugins are versioned, so we won't include the version field for other plugins.

The plugin can be specified via its fully qualified class name or its Connect alias like in the existing /connector-plugins/<plugin>/config/validate endpoint. If a plugin does not override the config() method, the response is an empty array.


For example, accessing http://localhost:8083/connector-plugins/org.apache.kafka.connect.transforms.Cast$Value/config will return:

[
  {
    "name": "spec",
    "type": "LIST",
    "required": true,
    "default_value": null,
    "importance": "HIGH",
    "documentation": "List of fields and the type to cast them to of the form field1:type,field2:type to cast fields of Maps or Structs. A single type to cast the entire value. Valid types are int8, int16, int32, int64, float32, float64, boolean, and string. Note that binary fields can only be cast to string.",
    "group": null,
    "width": "NONE",
    "display_name": "spec",
    "dependents": [],
    "order": -1
  }
]


Proposed Changes

REST API:

@GET
@Path("/{plugin}/config")
public List<ConfigKeyInfo> getPluginConfig() {}


@GET
@Path("/")
public List<ConnectorPluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) {}


Converter interface:

Add a config() method to Converter with a default implementation.

public interface Converter {

[...]

    /**
     * Configuration specification for this set of converters.
     * @return the configuration specification; may not be null
     */
    default ConfigDef config() {
        return new ConfigDef();
    }
}

It's common for custom converters to implement both Converter and HeaderConverter. As the 2 methods to retrieve the ConfigDef will have exactly the same signature, it will still be possible to implement both interfaces.

Compatibility, Deprecation, and Migration Plan

I propose to flip the query parameter value to list all plugins by default in the next major release.

Rejected Alternatives