...
Developers would be required to implement only the ConnectRestExtension interface to provide an extension. ConnectRestExtension provides an implementation of ConnectRestExtensionContext whose configurable() provides an implementation of javax.ws.rs.core.Configurable. Using this developers can register new JAX-RS resources like filter, new end points, etc
Code Block |
---|
interface ConnectRestExtension extends Closeable{ void register(ConnectRestExtensionContext restPluginContext); } |
As mentioned above, even though the developers are required to only implement the ConnectRestExtension, they will be using with the following new public interfaces as well.
ConnectRestExtensionContext
This is a request Context interface that composes and provides access to
- Configurable - register JAX RS resources
- workerConfig - access user provided worker configs
- clusterState - A new interface that helps provide some cluster state information
Code Block |
---|
interface ConnectRestExtensionContext{ Configurable configurable(); WorkerConfig workerConfig(); ConnectClusterState clusterState(); } |
ConnectClusterState
Provides methods to get some connector states and running connectors.
...
Users will be able to create a plugin by implementing the interface ConnectRestExtension. The key method to implement is the register method which takes ConnectRestExtensionContext . This allows us to change the interface easily in future to add new parameters. Connect runtime would also provide a default implementation for the interface ConnectRestExtensionContext. One or more of the implementation can be configured via the configuration rest.extension.plugins classes as a comma separated list of class names.
Code Block |
---|
interface ConnectRestExtension extends Closeable{ void register(ConnectRestExtensionContext restPluginContext); } interface ConnectRestExtensionContext{ Configurable configurable(); WorkerConfig workerConfig(); ConnectClusterState clusterState(); } class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext{ private final Configurable configurable; private final WorkerConfig workerConfig; private final ConnectClusterState clusterState; ConnectRestExtensionContext(Configurable configurable, WorkerConfig workerConfig, ConnectClusterState clusterState){ this.configurable = configurable; this.workerConfig = workerConfig; this.clusterState = clusterState; } public ResourceConfig configurable(){ return this.configurable; } public WorkerConfig workerConfig(){ return this.workerConfig; } public ConnectClusterState clusterState(){ return this.clusterState; } } |
We will be introducing another new public API ConnectClusterState which will at present provide some of the read only methods from the Herder. The change would also include a default implementation ConnectClusterStateImpl in the connect runtime that will delegate to the underlying Herder. This will be useful when you want to add new resources like healthcheck, monitoring, etc.
Code Block |
---|
interface ConnectClusterState{ /** * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered * from the current configuration. */ Collection<String> connectors(); /** * Lookup the current status of a connector. * @param connName name of the connector */ ConnectorStateInfo connectorStatus(String connName); } class ConnectClusterStateImpl implements ConnectClusterState{ private final Herder herder; public ConnectClusterStateImpl(Herder herder){ this.herder = herder; } @Override Collection<String> connectors(){ //delegate to herder } @Override ConnectorStateInfo connectorStatus(String connName);{ //delegate to herder } } |
Rest Extension Integration with Connect
The plugin's would be registered in the RestServer.start(Herder herder) method before after registering the default Connect resources. Connect Run time would implement an implementation of Configurable interface that woudl do the following
- Constructed with the ResourceConfig available in the RestServer
- Will check if resource is already registered. If not, it would delegate to ResourceConfig. If already registered would log a warning message.
- For non-register methods would just delegate to the ResourceConfig instance
Code Block |
---|
class ConnectRestConfigurable implements Configurable{
ResourceConfig resourceConfig;
public ConnectRestConfigurable(ResourceConfig resourceConfig) {
this.resourceConfig = resourceConfig;
}
//implement methods and delegate to resourceConfig
} |
Currently, when there are duplicate registrations the first registration wins in Jersey but it is an implementation detail of Jersey and Connect can't gurantee this. Hence, it is recommended that the plugins don't re-register the default Connect Resources. This could potentially lead to unexpected errors.The close for the plugins would be invoked as part of the stop() in the same class.
The new extension class and its dependencies would need to be as part of the plugin path. Hence ConnectRestExtension would be defined as a new plugin to be loaded by the PluginClassLoader.The plugin would be looked up based on Service Provider mechanism instead on Java's Service Provider API instead of the Reflections scan that is used for other plugins. This will help in terms of not adding class loader cost that is associated in scanning the classes today for other plugins. Hence the implementation must provide a `META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension` as part of the jar file containing the fully qualified implementation class .
Example
Consider the following example that defines a single plugin to add an authenticating filter and a health check resource.
Code Block |
---|
class ExampleConnectPluginExampleConnectRestExtension implements ConnectRestExtension{ public void register(ConnectRestExtensionContext restPluginContext){ resourceConfigrestPluginContext.configurable().register(new AuthenticationFilter(restPluginContext.workerConfig())); resourceConfigrestPluginContext.configurable().register(new HealthCheckResource(restPluginContext.workerConfig(), restPluginContext.clusterState())); } public void close() { //close any resources } } class AuthenticationFilter implements ContainerRequestFilter { public AuthenticationFilter(WorkerConfig workerConfig){ //set up filter } @Override public void filter(ContainerRequestContext requestContext) { //authentication logic } } @Path("/connect") class HealthCheckResource { public HealthCheckResource(WorkerConfig workerConfig, ConnectClusterState clusterState){ //initialize resource } @path("/health") public void healthcheck(){ //check herder health } } |
The above illustrated plugin can then be configured in the worker's configuration as below
Code Block | ||
---|---|---|
| ||
// configure plugin rest.plugins=com.example.ExampleConnectPlugin |
For the RestExtension to be loaded, the jar should be packaged with the following file that lists the fully qualified name of the ConnectRestExtension implementation
META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
com.example.ExampleConnectRestExtension
Reference Implementation
The KIP proposes to include a refernce implementation that allows users to authenticate incoming Basic Auth headers against a properties file containing list of username & passwords.
Compatibility, Deprecation, and Migration Plan
...
- Creating configs specific to the plugin and just passing them to the plugin based on a prefix. It was considered much easier to make the complete WorkerConfig. Also, in many cases the plugins would need to know just more than their configs to implement their actions.
- Passing the Herder to the plugin was considered but it was rejected since the Herder API is not public and we don't want to expose the complete Herder capabilities to the plugin.
- Providing ability to just add Filters insteda of any kind of Jersey resource was considered but it was rejected because it was too limiting in its capability that one canot add new resource end points or add a jersey provider.Registering the plugin after the default Connector resources are registered is rejected because of the fact that we don't want to allow users to remove resources at the moment.
- Having the Connect REST plugin as part of class path is rejected for the same reason why we have custom interfaces like Converters and Connectors as plugin and loaded via plugin path.
...