Status
Current state: Under Discussion
Discussion thread here
JIRA: [TODO]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
REST extensions for Kafka Connect were introduced via KIP-285: Connect Rest Extension Plugin. Part of the motivation for this KIP was to allow "complex extensions" to "provide filters that rewrite/validate the connector requests to enforce additional constraints on the connector configurations", and the KIP did include the addition of a ConnectClusterState
interface which would be used to provide information about the Connect cluster to these REST extensions. However, the information provided by the ConnectClusterState
interface is somewhat limited at the moment and only includes a list of connectors and the ability to query for the health of a specific connector, which includes the state of the connector as well as the states of its tasks. Expanding the ConnectClusterState
interface to include information such as the configurations of connectors and their tasks, and the ID of the current Kafka cluster, would make writing these complex extensions easier.
Public Interfaces
As the title of this KIP suggests, the changes will affect the ConnectClusterState
interface and its only current implementation, the ConnectClusterStateImpl
class.
Additional methods to add to the ConnectClusterState
interface:
public interface ConnectClusterState { /** * Lookup the current configuration of a connector. This provides the current snapshot of configuration by querying the underlying * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link * org.apache.kafka.connect.errors.NotFoundException}. * * @param connName name of the connector * @return the configuration of the connector for the connector name * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found */ Map<String, String> connectorConfig(String connName); /** * Lookup the current task configurations of a connector. This provides the current snapshot of configuration by querying the underlying * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link * org.apache.kafka.connect.errors.NotFoundException}. * * @param connName name of the connector * @return the configuration for each task ID * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found **/ Map<Integer, Map<String, String>> taskConfigs(String connName); /** * Get the cluster ID of the Kafka cluster backing this Connect cluster. * @return the cluster ID of the Kafka cluster backing this connect cluster **/ String kafkaClusterId(); }
Proposed Changes
The basic idea here is to add as much information to the ConnectClusterState
interface as is available via the Connect REST API.
Right now, it is possible for request filters added via a REST extension to "intercept" new connector configurations that are submitted via the REST API (through a PUT
request to /connectors/<name>/config
, or a POST
request to /connectors
, for example) and validate and/or modify them.
However, some validation logic, like ensuring that certain connector configuration properties are never modified by unauthorized users, is difficult without knowledge of the current configuration of the connector.
Additionally, the Kafka cluster ID may be useful for the purpose of uniquely identifying a Connect cluster from within a REST extension, since users may be running multiple Kafka clusters and the group.id
for a distributed Connect cluster may not be sufficient to identify a cluster.
Compatibility, Deprecation, and Migration Plan
Not applicable; all proposed changes are backwards compatible.
Rejected Alternatives
Query the Connect REST API from within the extension itself
For example, to compare a new configuration for a connector to its current configuration, make a GET
request to /connectors/<connName>/
and read the current connector configuration from the response. Rejected because the Connect worker's REST interface may be restricted for security purposes, and requiring users to provide extra security configuration information to a REST extension would be painful and may not be possible in all circumstances. Implementation would also be more difficult for writers of the REST extension than necessary. Additionally, if this were a recommended means of collecting information about a Connect cluster from within a REST extension, the ConnectClusterState
interface in its current form would be entirely redundant.
Create an AdminClient
To deduce the Kafka cluster ID, create an AdminClient
based on the worker configuration properties provided to the extension in its configure(...)
method, then get the cluster ID using that client. Rejected because, like above, implementation would be more difficult for writers of the REST extension than necessary.
Consume from the Connect configuration topic
To deduce the configurations of connectors and tasks, read directly from the config.storage.topic
. Rejected because, like above, implementation would be more difficult for writers of the REST extension than necessary. Also, as with the rejected alternative of querying the Connect REST API from within the extension itself, if this were a recommended means of collecting information about a Connect cluster from within a REST extension, the status information of connectors could be read from the status.storage.topic
, and the ConnectClusterState.connectorHealth(...)
method would be partially if not entirely redundant.