Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "3.2.0"
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "3.2.0"
  },   {
    "class": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "type": "converter",
    "version": "3.2.0"  
    "class": "org.apache.kafka.connect.transforms.Cast$Value",
    "type": "transformation",
    "version": "3.2.0"
    "class": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
    "type": "predicate",
    "version": "3.2.0"  
    "class": "",
    "type": "header_converter",
    "version": "3.2.0"
    "class": "",
    "type": "converter",
    "version": "3.2.0"

Currently only Connector plugins are versioned, so we won't include the version field for other plugins.

  • GET /connector-plugins/<plugin>/config: This new endpoint will return the configuration definitions of the specified plugin. It will work with all plugins returned by /connector_plugins.


Code Block
public List<ConnectorPluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) {}

Versioned interface:

Add a default implementation for version():

Code Block
 * Connect requires some components implement this interface to define a version string.
public interface Versioned {

    String UNDEFINED_VERSION = "undefined";
     * Get the version of this component.
     * @return the version, formatted as a String. The version may not be {@code null} or empty.
    default String version() {
        return UNDEFINED_VERSION;

Converter interface:

Add a config() method to Converter with a default implementation. Also make it extend Versioned.

Code Block
public interface Converter extends Versioned {


     * Configuration specification for this set of converters.
     * @return the configuration specification; may not be null
    default ConfigDef config() {
        return new ConfigDef();

It's common for custom converters to implement both Converter and HeaderConverter. As the 2 methods to retrieve the ConfigDef will have exactly the same signature, it will still be possible to implement both interfaces.HeaderConverter, Transformation and Predicate interface:

Update these 3 interfaces to extend Versioned.

All Converter, HeaderConverter, Transformation and Predicate built-in classes:

All the built-in implementations will override version() to return the current Kafka version.

Compatibility, Deprecation, and Migration Plan

  • /connector-plugins keeps its current behavior and will only expose the new behavior when a new query parameter is set.
  • When accessing /connector-plugins/<plugin>/config on existing converters that don't implement the config() method, an empty array will be returned. If a converter is also implementing HeaderConverter, and hence already have a config() method, it will be automatically used and the config will be returned.When accessing
  • /connector-plugins, existing plugins that don't implement version(), will have the version set to undefined./connector-plugins/<plugin>/config is a new endpoint that doesn't cause compatibility issues.


  • Add a new endpoint /plugins for listing all plugins: It would be confusing to list both worker and connector plugins together. We'd then end up with 3 endpoints, /plugins, /worker-plugins and /connector-plugins which is as confusing!
  • Group connectors by type when listing them: This would break compatibility with the existing /connector-plugins behavior. As it's a very commonly used endpoint, it's preferred to keep compatibility.
  • Add a new endpoint /worker-plugins to list worker plugins (Rest Extensions and Config Providers): The use case is to allow administrators to check the plugins installed in each worker. Connect shouldn't expose worker internal details to all users and it's not clear what information would be useful for admins. Also Connect already has a /admin endpoint which should be reused for admin tasks.
  • Make all plugins implement Versioned. Initially we wanted to make all plugins consistent, but this either force having a default implementation for version() which would allow Connectors to not implement it, or force introducing another interface (PossiblyVersioned) to version other plugins which did not make a lot of sense since version does not have any contract today.