You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-382 introduced MirrorMaker 2.0, leveraging the Connect framework to provide a more reliable way of mirroring between Kafka clusters. The KIP also introduced the dedicated MirrorMaker 2.0 cluster, which is capable of running multiple Connect clusters and replication flows inside a dedicated cluster.

Unfortunately, the dedicated cluster mode has some missing features. There is an issue, which can render such clusters failing: the missing Connect REST API. MM2 clusters have this feature excluded, which makes follower->leader communication impossible in the cluster. Because of this, given certain workloads, the dedicated MM2 cluster can become unable to react to dynamic topic and group list changes. Another missing feature is the lazy resolution of configuration provider references in Connector configs - in dedicated mode, MM2 eagerly resolves such references, making it impossible to provide host-specific configurations through the indirection of configuration providers.

The goal of this KIP is to solve these issues by enabling follower to leader communication in dedicated MirrorMaker 2.0 nodes, and lazily evaluating configuration provider references in Connector configs created by the MM2 dedicated mode.

Failure scenario due to missing REST server feature

In MM2, dynamic configuration changes can occur based on TopicFilter and GroupFilter implementations. For example, the default TopicFilter implementation accepts topic name patterns to be included in the replication, enabling it to add new topics to an already running replication flow. (Similarly, dynamic config change can occur by providing an implementation of TopicFilter and/or GroupFilter which is capable of detecting changes during runtime.) The MirrorSourceConnector periodically polls these filters, picking up any changes in the mirrored topics and/or groups.

The issue occurs when a rebalance step results in the leader stopping its MirrorSourceConnector instance and moving it to a follower. Because of this, the leader is not capable of detecting topic or group filter configuration changes by itself. The follower running MirrorSourceConnector is capable of detecting the config change, but as per the Connect workflow, they cannot apply it to the current workload - they need to notify the leader through the REST API. Since REST is not available in dedicated MM2 mode, the notification never reaches the leader, rendering the cluster unable to detect config changes.

Public Interfaces

  • MM2 nodes now start a Connect REST server for each replication flow. This server has the exact same API as the Connect REST API - also utilizes the same implementation.

  • MM2 worker id generation now uses the following pattern: <REST_HOST_NAME>/<SOURCE_CLUSTER_ALIAS>__<TARGET_CLUSTER_ALIAS>

  • MM2 now accepts mm.rest.host.name and mm.rest.protocol, which act as a centralized listener configuration for the REST server, in case the listeners property is not configured through other means.

  • MM2 now accepts properties starting with the following prefix in the worker config: listeners.https.

  • MM2 now accepts properties starting with the following prefix as replication flow specific worker configuration:
    cluster_alias->cluster_alias.worker.

  • MM2 now do not resolve configuration provider references in ConnectorConfigurations, letting those references to be resolved on each node of the cluster, at Connector configuration time.

Proposed Changes

This KIP proposes 3 major changes:

  • Enabling the Connect REST server per replication flow in the MirrorMaker 2.0 node.

  • Extending the configuration method of MirrorMaker 2.0 to support a centralized way of REST configuration, and per-flow configurations.

  • Enabling the the lazy evaluation of configuration provider references in ConnectorConfigurations created by dedicated MirrorMaker 2.0 nodes.

Enabling the Connect REST server

Currently, for each replication flow, a dedicated MM2 node creates a separate Connect node, and each node joins a Connect cluster dedicated to a specific replication flow. This is done by instantiating a worker and a herder. These two components are the essential parts of a classic Connect node. To support the full feature set of a Connect node, MM2 would also need to provide the Connect REST API between its nested Connect nodes.

This can be done by also instantiating a REST server for each replication flow. This would ensure that every nested Connect node has its full capabilities, and enables Connect follower nodes to communicate with their leaders.

To also support the worker identification in the connect status topic, the worker id generation changes to:
<REST_HOST_NAME>/<SOURCE_CLUSTER_ALIAS>__<TARGET_CLUSTER_ALIAS>

To make sure that we keep the old behavior, REST servers should be able start up without any additional configuration. This would be ensured by letting the nested REST servers to bind on any free port if no listeners are provided.

Extending the configuration method

Currently, MM2 does not support replication flow specific Connect node configurations. Based on this proposal, MM2 would contain a REST server for each replication flow. To ensure that these REST servers can be properly configured, MM2 needs to support centralized (generic) REST server configuration, and per-flow configuration.

To support the centralized configuration of REST servers, we propose the following:

  1. Introduce two new MM2 configurations. These act as fallback for the REST server listener configurations, and are only used if the listeners property is not defined for the worker through other means (either through per-cluster properties or per-flow properties). In most cases, there would be no need for replication flow specific configurations. The port will be set to 0 for REST servers, so users don’t need to specify the port for each server.

    1. mm.rest.host.name - the hostname all REST servers should bind on.

    2. mm.rest.protocol - the protocol all REST servers should use.

  2. Worker configs will also contain properties starting with the listeners.https. prefix. This prefix is used by the Connect REST server implementation, and the Connect client. Security configurations can be provided this way in a centralized manner, assuming that all REST servers should use the same configuration.

To support per replication REST configurations, we propose the following:

  1. Worker configs will also contain properties starting with the following prefix, stripped of the prefix, which is replication flow specific: cluster_alias->cluster_alias.worker.

Allowing configuration provider references to be lazily resolved in ConnectorConfigurations created by MirrorMaker 2.0

Currently, when a dedicated MM2 node derives the base of the ConnectorConfiguration from its MirrorMakerConfiguration, it eagerly resolves all configuration provider references (e.g. environment variable references). Because of this, the configurations persisted in Kafka contain the resolved values instead of the references.

This is undesirable, as it makes it impossible to provide host-specific configurations to the Connectors when using the dedicated mode of MM2. Connector configurations must be applicable on all hosts running members of the MM2 cluster. This can be problematic in case of file path type configurations, for example ssl keystore and truststore locations.

To solve this, MirrorMakerConfiguration should keep track of the original, pre-resolution state of the configuration, and use that to derive the ConnectorConfigurations. With this, the configs saved into Kafka will contain references instead of resolved values, and this also enables Connect’s Worker to resolve the config provider references at a later point - matching the behavior of a vanilla Connect node.

Compatibility, Deprecation, and Migration Plan

This KIP only extends the capabilities of MM2 nodes, and does not make any breaking changes. Old behavior is kept by providing sensible defaults - i.e. making sure that even without additional worker configurations, MM2 nodes can start up. Worker id generation is changed to support multiple MM2 nodes in a cluster.

Rejected Alternatives

Extending the Connect framework

The discussion on KIP-382 brought up the an alternative solution of extending the Connect framework capabilities. Connect could leverage a feature of running connectors against multiple Kafka clusters in a single Connect cluster. This is the generalization of the dedicated MM2 feature: a Connect node can run multiple workers and herders, connecting to different Kafka clusters. With this feature, REST API changes would be needed as well, since a single REST server would need to be able to support multiple Connect clusters. This could be implemented by introducing an additional path parameter in the queries, which could identify which nested Connect cluster is queried.

This solution would need significant changes in Connect and its REST API, while it is unclear if this feature would provide improvements in usage on-par with the amount of effort needed.

Creating an internal-only derivation of the Connect REST API

For MM2 distributed mode to work, the only required endpoint is the internal task configuration endpoint, which is used by followers to push new configuration to the leader. A possible solution would be to create a REST server which only contains this endpoint, but prefixes the endpoint path with a replication flow ID. From an architectural and security viewpoint, this is beneficial as each MM2 node would only run a single REST server. From an implementation viewpoint, this is problematic, as the Connect REST Server would need refactoring to support this use-case, and newer features of the Connect REST API might be affected by the restructuring.

Explicitly documenting that dedicated MM2 does not support distributed mode

Another possible solution is to explicitly reject the distributed mode of MM2, and document that it is not supported. Since the dedicated mode of MM2 provides essential improvements over the vanilla Connect mode (namely in terms of configuration and operations), distributed mode should be fixed instead of abandoned.

  • No labels