Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6776
 here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

 

Implementations would use the ResourceConfig to register one or more Jersey resources. They can use the WorkerConfig to get any plugin specific configuration.

Code Block
interface ConnectRestPlugin extends Closeable{
    void register(ConnectRestPluginContext restPluginContext);
}
 
interface ConnectRestPluginContext{
    ResourceConfig resourceConfig();
    WorkerConfig workerConfig();
    ConnectClusterState clusterState();
}
 
class ConnectRestPluginContextImpl implements ConnectRestPluginContext{
    private final ResourceConfig resourceConfig;
    private final WorkerConfig workerConfig;
    private final ConnectClusterState clusterState;
 
    ConnectRestPluginContext(ResourceConfig resourceConfig, WorkerConfig workerConfig, ConnectClusterState clusterState){
        this.resourceConfig = resourceConfig;
        this.workerConfig = workerConfig;
        this.clusterState = clusterState;
    }
 
    public ResourceConfig resourceConfig(){
        return this.resourceConfig;
    }
 
    public WorkerConfig workerConfig(){
        return this.workerConfig;
    }
 
    public ConnectClusterState clusterState(){
        return this.clusterState;
    }
}

 

We will be introducing another new public API ConnectClusterState which will at present provide some of the read only methods from the Herder.  The change would also include a default implementation ConnectClusterStateImpl in the connect runtime that will delegate to the underlying Herder. This will be useful when you want to add new resources like healthcheck, monitoring, etc.

Code Block
interface ConnectClusterState{
    /**
     * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
     * from the current configuration.
     */
    Collection<String> connectors();
 
    /**
     * Lookup the current status of a connector.
     * @param connName name of the connector
     */
    ConnectorStateInfo connectorStatus(String connName);
 
}
 
class ConnectClusterStateImpl implements ConnectClusterState{
    private final Herder herder;
 
    public ConnectClusterStateImpl(Herder herder){
        this.herder = herder;
    }
 
    @Override
    Collection<String> connectors(){
        //delegate to herder
    }
 
    @Override
    ConnectorStateInfo connectorStatus(String connName);{
        //delegate to herder
    }
}
Plugin Integration with Connect

The plugin's would be registered in the RestServer.start(Herder herder) method before registering the default Connect resources. Currently, when there are duplicate registrations the first registration wins in Jersey but it is an implementation detail of Jersey and Connect can't gurantee this. Hence, it is recommended that the plugins don't re-register the default Connect Resources. This could potentially lead to unexpected errors.The close for the plugins would be invoked as part of the stop() in the same class. 

The new extension class and its dependencies would need to be as part of the plugin path. Hence ConnectRestPlugin would be defined as a new plugin to be loaded by the PluginClassLoader.The plugin would be looked up based on Service Provider mechanism instead of the Reflections scan that is used for other plugins. This will help in terms of not adding class loader cost that is associated in scanning the classes today for other plugins.

Example

Consider the following example that defines a single plugin to add an authenticating filter and a health check resource.

Code Block
class  ConfluentConnectPluginExampleConnectPlugin implements ConnectRestPlugin{
    public void register(ConnectRestPluginContext restPluginContext){
        resourceConfig.register(new AuthenticationFilter(restPluginContext.workerConfig()));
        resourceConfig.register(new HealthCheckResource(restPluginContext.workerConfig(), restPluginContext.clusterState()));
    }
 
    public void close() {
        //close any resources
    }
}
 
class AuthenticationFilter implements ContainerRequestFilter {
 
  public AuthenticationFilter(WorkerConfig workerConfig){
    //set up filter
  }
     
  @Override
  public void filter(ContainerRequestContext requestContext) {
        //authentication logic
  }
 
}
 
@Path("/connect")
class HealthCheckResource {
 
    public HealthCheckResource(WorkerConfig workerConfig, ConnectClusterState clusterState){
        //initialize resource
    }
 
    @path("/health")
    public void healthcheck(){
        //check herder health
    }
}

The above illustrated plugin can then be configured in the worker's configuration as below

 

Code Block
titleconnect-worker.properties
 // configure plugin
rest.plugins=iocom.confluentexample.ConfluentConnectPluginExampleConnectPlugin

 

Compatibility, Deprecation, and Migration Plan

...