Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted (2.3.0)

Discussion threadhere

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8231

PRhttps://github.com/apache/kafka/pull/6584

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As the title of this KIP suggests, the changes will affect the ConnectClusterState interface and its only current implementation, the ConnectClusterStateImpl class.Each new method will come with a default implementation that throws an UnsupportedOperationException; this will enable users who have written their own implementation of the interface to rely on future versions of AK with the new interface methods without having to implement this method themselves. The ConnectClusterStateImpl class will be updated with working versions of each method that do not throw UnsupportedOperationExceptions.

Additional methods to add to the ConnectClusterState interface:

Code Block
languagejava
titleConnectClusterState interface - additional methods
/**
 * Lookup the current configuration of a connector. This provides the current snapshot of configuration by querying the underlying
 * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link
 * org.apache.kafka.connect.errors.NotFoundException}.
 *
 * @param connName name of the connector
 * @return the configuration of the connector for the connector name
 * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
 * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
 */
default Map<String, String> connectorConfig(String connName) {
  throw new UnsupportedOperationException();
}

/**
 * LookupGet thedetails currentabout taskthe configurationssetup of a connector. This provides the current snapshot of configuration by querying the underlyingConnect cluster.
 * herder. A connector returned by previous invocation of@return a {@link #connectors()ConnectClusterDetails} mayobject nocontaining longerinformation beabout available and could result in {@linkthe cluster
 * org.apache.kafka.connect.errors.NotFoundException}.
 *
 * @param connName name of the connector
 * @return the configuration for each task ID
 * @throws */
ConnectClusterDetails clusterDetails();


A new ConnectClusterDetails interface will be added as well, that contains immutable information about the Connect cluster:

Code Block
languagejava
titleConnectClusterDetails - new interface
package org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
 * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
 **/
default Map<Integer, Map<String, String>> taskConfigs(String connName) {
  throw new UnsupportedOperationException();
}

health;

public interface ConnectClusterDetails {

  /**
   * Get the cluster ID of the Kafka cluster backing this Connect cluster.
   * @return the cluster ID of the Kafka cluster backing this connect cluster
 * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
 **/
default  public String kafkaClusterId() {
  throw new UnsupportedOperationException();
};
}

This interface will only provide the ID of the backing Kafka cluster for now, but may be expanded in the future to include the mode of the Connect worker (standalone, distributed, perhaps embedded(?)), the group ID of a distributed cluster, or other information.

Proposed Changes

The basic idea here is was to add as much information to the ConnectClusterState interface as is available via the Connect REST API; this includes all currently-available read-only methods of the Herder interface. However, due to lack of a convincing use case, information on task configurations will be left out for now.

For some of these methods, such as connectorConfig(...) and taskConfigs(...), communication with the underlying Herder will be necessary. The return values will reflect the most up-to-date information that the worker has available locally; the herder will not forward requests to the leader of the worker group. An exception will be thrown if the herder is expecting an impending rebalance. This aligns with the behavior of the current ConnectClusterStateImpl class.

...

Compatibility, Deprecation, and Migration Plan

Not applicable; all proposed changes are backwards compatible.

Users who have written their own ConnectClusterState implementations will still be able have to implement these additional methods if they would like to develop against releases of AK that contain the changes here, and will not need to implement the new methods. However, they must not call these new methods without overriding them with a functioning version themselves, or an exception will be thrown. Since this interface is already implemented by the Connect framework this is unlikely to be a problem. However, should a developer of a custom ConnectClusterState encounter compile-time problems after migrating to a new release that includes these new interface methods, they will have several options available including throwing UnsupportedOperationExceptions, providing empty implementations, or providing actual implementations.

Since this is a feature addition and not a bug fix, the targeted version is the upcoming 2.3 release.

Rejected Alternatives

Adding

...

default implementations for new methods

Adding non-default methods to the interface would be is technically a backwards incompatible change for anyone developing their own ConnectClusterState implementation. Adding defaults for these methods, even if they immediately throw exceptions, solves this problem. However, that would defeat the purpose of implementing the interface in the first place–users looking up the ConnectClusterState javadocs, for example, would expect those methods to be available. In order to reduce confusion for what is likely to be the most common case, no default methods will be added.

Exposing task configurations

Although information on task configurations is readily available from Herder instances, there has yet to be a convincing use case for exposing information on task configurations to REST extensions. This functionality can always be added later in a separate KIP; for now, we'll err on the side of caution and not add support for a feature that may not be used by anyone.

Query the Connect REST API from within the extension itself

...