Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • GET /connector-plugins: This endpoint will be updated to allow listing all plugins. A new query parameter "connectorsOnly" will be added and it will default to true so it's fully compatible with the current behavior. Users will be able to list all Connectors, Transformations, Converters, HeaderConverters and Predicates plugins by setting it to false. Classes that implement multiple plugin types will appear once for each type. For example SimpleHeaderConverter will be listed as a converter and as a header_converter.

For example GET /connector-plugins?connectorsOnly=false will return:

Code Block
languagejs
[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "3.0.0"
  },
  {
    "class": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "type": "converter",
    "version": "3.0.0"  
  },
  {
    "class": "org.apache.kafka.connect.transforms.Cast$Value",
    "type": "transformation",
    "version": "3.0.0"
  },
  {
    "class": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
    "type": "predicate",
    "version": "3.0.0"  
  },
  {
    "class": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
    "type": "header_converter",
    "version": "3.0.0"
  },
  {
    "class": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
    "type": "converter",
    "version": "3.0.0"
  },   
  ...
]


  • GET /connector-plugins/<plugin>/config: This new endpoint will return the configuration definitions of the specified plugin. It will work with all plugins returned by /connector_plugins.

The plugin can be specified via its fully qualified class name or its Connect alias like in the existing /connector-plugins/<plugin>/config/validate endpoint. If a plugin does not override the config() method, the response is an empty array.


For example, accessing http://localhost:8083/connector-plugins/Cast$Value/config will return:

...

Code Block
languagejava
@Path("/worker-plugins")
public class WorkerPluginsResource {
    @GET
    @Path("/")
    public List<WorkerPluginInfo> listWorkerPlugins() {}
}


Versioned interface:

Add a default implementation for version():

Code Block
languagejava
/**
 * Connect requires some components implement this interface to define a version string.
 */
public interface Versioned {

    String UNDEFINED_VERSION = "undefined";
    /**
     * Get the version of this component.
     *
     * @return the version, formatted as a String. The version may not be {@code null} or empty.
     */
    default String version() {
        return UNDEFINED_VERSION;
    }
}


Converter interface:

Add a config() method to Converter with a default implementation. Also make it Configurable and Closeable so it's uniform with the other plugins:extend Versioned.

Code Block
languagejava
public interface Converter extends Configurable, CloseableVersioned {

[...]

    /**
     * Configuration specification for this set of converters.
     * @return the configuration specification; may not be null
     */
    default ConfigDef config() {
        return new ConfigDef();
    }
}

It's common for custom converters to implement both Converter and HeaderConverter. As the 2 methods to retrieve the ConfigDef will have exactly the same signature, it will still be possible to implement both interfaces.


HeaderConverter, Transformation and Predicate interface:

Update these 3 interfaces to extend Versioned.


All Converter, HeaderConverter, Transformation and Predicate built-in classes:

All the built-in implementations will override version() to return the current Kafka version.

Compatibility, Deprecation, and Migration Plan

...