Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Dong's suggestions

Table of Contents

Status

Current stateVotingAccepted

Discussion thread: here

JIRA: 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7500

...

  • Topics are created with default configuration. Often they'll need to be repartitioned manually.

  • ACL and configuration changes are not synced across mirrored clusters, making it difficult to manage multiple clusters.

  • Records are repartitioned with DefaultPartitioner. Semantic partitioning may be lost.

  • Any configuration change means the cluster must be bounced. This includes adding new topics to the whitelist, which may be a frequent operation.

  • There's no mechanism to migrate producers or consumers between mirrored clusters.

  • There's no support for exactly-once delivery. Records may be duplicated during replication.

  • Clusters can't be made mirrors of each other, i.e. no support for active/active pairs.

  • Rebalancing causes latency spikes, which may trigger further rebalances.

For these reasons, MirrorMaker is insufficient for many use cases, including backup, disaster recovery, and fail-over scenarios. Several other Kafka replication tools have been created to address some of these limitations, but Apache Kafka has no adequate replication strategy to date. Moreover, the lack of a native solution makes it difficult to build generic tooling for multi-cluster environments.

...

  • Leverages the Kafka Connect framework and ecosystem.

  • Includes both source and sink connectors.

  • Includes a high-level driver that manages connectors in a dedicated cluster.
  • Detects new topics, partitions.
  • Automatically syncs topic configuration between clusters.

  • Manages downstream topic ACL.

  • Supports "active/active" cluster pairs, as well as any number of active clusters.

  • Supports cross-datacenter replication, aggregation, and other complex topologies.

  • Provides new metrics including end-to-end replication latency across multiple data centers/clusters.

  • Emits offsets required to migrate consumers between clusters.

  • Tooling for offset translation.

  • MirrorMaker-compatible legacy mode.

  • No rebalancing.

Public Interfaces

New classes and interfaces include:

  • MirrorSourceConnector, MirrorSinkConnector, MirrorSourceTask, MirrorSinkTask classes.

  • MirrorCheckpointConnector, MirrorCheckpointTask.
  • MirrorHeartbeatConnector, MirrorHeartbeatTask.
  • MirrorConnectorConfig, MirrorTaskConfig classes.
  • ReplicationPolicy interface. DefaultReplicationPolicy and LegacyReplicationPolicy classes.

  • Heartbeat and checkpoint topics and associated schemas.

  • RemoteClusterUtils class for querying remote cluster reachability and lag, and for translating consumer offsets between clusters.

  • MirrorMaker driver class with main entry point for running MM2 cluster nodes.

  • MirrorMakerConfig used by MirrorMaker driver.
  • HeartbeatMessageFormatter, CheckpointMessageFormatter
  • ./bin/connect-mirror-maker.sh and ./config/mirror-maker.properties sample configuration.

Proposed Changes

Remote Topics, Partitions

This design introduces the concept of remote topics, which are replicated topics referencing a source cluster via a naming convention, e.g. "us-west.topic1", where topic1 is a source topic and us-west is a source cluster alias. Any partitions in a remote topic are remote partitions and refer to the same partitions in the source topic. Specifically:

  • Partitioning and order of records is preserved between source and remote topics.

  • Remote topics must have the same number of partitions as their source topics.

  • A remote topic has a single source topic.

  • A remote partition has a single source partition.

  • Remote partition i is a replica of source partition i.

MM2 replicates topics from a source cluster to corresponding remote topics in the destination cluster. In this way, replication never results in merged or out-of-order partitions. Moreover, the topic renaming policy enables "active/active" replication by default:

Image Removed

"active/active" replication

This is not supported by legacy MirrorMaker out-of-the-box (without custom handlers) -- records would be replicated back and forth indefinitely, and the topics in either cluster would be merged inconsistently between clusters. The remote naming convention avoids these problems by keeping local and remote records in separate partitions. In either cluster above, the normal topic1 contains only locally produced records. Likewise, B.topic1 contains only records produced in cluster B.

To consume local records only, a consumer subscribes to topic1 as normal. To consume records that have been replicated from a remote cluster, consumers subscribe to a remote topic, e.g. B.topic1.

This convention extends to any number of clusters.

Aggregation

Downstream consumers can aggregate across multiple clusters by subscribing to the local topic and all corresponding remote topics, e.g. topic1, us-west.topic1, us-east.topic1… or by using a regex subscription.

Topics are never merged or repartitioned within the connector. Instead, merging is left to the consumer. This is in contrast to MirrorMaker, which is often used to merge topics from multiple clusters into a single aggregate cluster.

Likewise, a consumer can always elect not to aggregate by simply subscribing to the local topic only, e.g. topic1.

This approach eliminates the need for special purpose aggregation clusters without losing any power or flexibility.

Cycle detection

It is possible to configure two clusters to replicate each other ("active/active"), in which case all records produced to either cluster can be seen by consumers in both clusters. To prevent infinite recursion, topics that already contain "us-west" in the prefix won't be replicated to the us-west cluster.

This rule applies across all topics regardless of topology. A cluster can be replicated to many downstream clusters, which themselves can be replicated, yielding topics like us-west.us-east.topic1 and so on. The same cluster alias will not appear twice in a topic name due to cycle detection.

In this way, any topology of clusters is supported, not just DAGs.

Config, ACL Sync

MM2 monitors source topics and propagates configuration changes to remote topics. Any missing partitions will be automatically created.

Remote topics should never be written to, except by the source or sink connector. Producers must only write to local topics (e.g. topic1), which may then be replicated elsewhere. To enforce this policy, MM2 propagates ACL policies to downstream topics using the following rules:

  • Principals that can READ the source topic can READ the remote topic.

  • No one can WRITE to the remote topic except MM2.

Internal Topics

MM2 emits a heartbeat topic in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible. Cycle detection prevents infinite recursion.

Image Removed

heartbeats are replicated between clusters

The schema for the heartbeat topic contains:

  • cluster (String): cluster where the heartbeat was emitted
  • connector (String): name of Connector which produced the heartbeat
  • timestamp (long): epoch millis when heartbeat was created

Additionally, the connectors periodically emit checkpoints in the destination cluster, containing offsets for each consumer group in the source cluster. The connector will periodically query the source cluster for all committed offsets from all consumer groups, filter for those topics being replicated, and emit a message to a topic like us-west.checkpoints.internal in the destination cluster. The message will include the following fields:

  • source cluster alias (String)

  • consumer group id (String)

  • topic (String)

  • partition (int)

  • offset (int): latest committed offset in source cluster

  • metadata (String)
  • timestamp

A Checkpoint class exposes these via the following methods:

As with  __consumer_offsets, the checkpoint topic is log-compacted to reflect only the latest offsets across consumer groups. The topic will be created automatically by the connector if it doesn't exist.

The heartbeat and checkpoint records are encoded using Kafka's internal serdes, as with __consumer_offsets. A MessageFormatter will be supplied for use with ConsoleConsumer etc.

Relevant classes below.

New metrics include:

  • heartbeat-ms: timestamp of latest heartbeat creation time
  • heartbeat-age-ms: timespan between now and latest heartbeat creation time
  • heartbeat-lag-ms: timespan between latest heartbeat's upstream creation time and downstream ACK
  • replication-lag-ms: timespan between latest record's consumed-at time and downstream ACK
  • replication-lag-ms-p99, -p95, -p90, -p50

 The mbean name for these metrics will be: kafka.connect:type=mirror-maker-metrics,source=([.\w]+),target=([.\w]+)

Proposed Changes

Remote Topics, Partitions

This design introduces the concept of remote topics, which are replicated topics referencing a source cluster via a naming convention, e.g. "us-west.topic1", where topic1 is a source topic and us-west is a source cluster alias. Any partitions in a remote topic are remote partitions and refer to the same partitions in the source topic. Specifically:

  • Partitioning and order of records is preserved between source and remote topics.

  • Remote topics must have the same number of partitions as their source topics.

  • A remote topic has a single source topic.

  • A remote partition has a single source partition.

  • Remote partition i is a replica of source partition i.

MM2 replicates topics from a source cluster to corresponding remote topics in the destination cluster. In this way, replication never results in merged or out-of-order partitions. Moreover, the topic renaming policy enables "active/active" replication by default:

Image Added

"active/active" replication

This is not supported by legacy MirrorMaker out-of-the-box (without custom handlers) -- records would be replicated back and forth indefinitely, and the topics in either cluster would be merged inconsistently between clusters. The remote naming convention avoids these problems by keeping local and remote records in separate partitions. In either cluster above, the normal topic1 contains only locally produced records. Likewise, B.topic1 contains only records produced in cluster B.

To consume local records only, a consumer subscribes to topic1 as normal. To consume records that have been replicated from a remote cluster, consumers subscribe to a remote topic, e.g. B.topic1.

This convention extends to any number of clusters.

Aggregation

Downstream consumers can aggregate across multiple clusters by subscribing to the local topic and all corresponding remote topics, e.g. topic1, us-west.topic1, us-east.topic1… or by using a regex subscription.

Topics are never merged or repartitioned within the connector. Instead, merging is left to the consumer. This is in contrast to MirrorMaker, which is often used to merge topics from multiple clusters into a single aggregate cluster.

Likewise, a consumer can always elect not to aggregate by simply subscribing to the local topic only, e.g. topic1.

This approach eliminates the need for special purpose aggregation clusters without losing any power or flexibility.

Cycle detection

It is possible to configure two clusters to replicate each other ("active/active"), in which case all records produced to either cluster can be seen by consumers in both clusters. To prevent infinite recursion, topics that already contain "us-west" in the prefix won't be replicated to the us-west cluster.

This rule applies across all topics regardless of topology. A cluster can be replicated to many downstream clusters, which themselves can be replicated, yielding topics like us-west.us-east.topic1 and so on. The same cluster alias will not appear twice in a topic name due to cycle detection.

In this way, any topology of clusters is supported, not just DAGs.

Config, ACL Sync

MM2 monitors source topics and propagates configuration changes to remote topics. Any missing partitions will be automatically created.

Remote topics should never be written to, except by the source or sink connector. Producers must only write to local topics (e.g. topic1), which may then be replicated elsewhere. To enforce this policy, MM2 propagates ACL policies to downstream topics using the following rules:

  • Principals that can READ the source topic can READ the remote topic.

  • No one can WRITE to the remote topic except MM2.

Internal Topics

MM2 emits a heartbeat topic in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible. Cycle detection prevents infinite recursion.

Image Added

heartbeats are replicated between clusters

The schema for the heartbeat topic contains:

  • cluster (String): cluster where the heartbeat was emitted
  • connector (String): name of Connector which produced the heartbeat
  • timestamp (long): epoch millis when heartbeat was created

Additionally, the connectors periodically emit checkpoints in the destination cluster, containing offsets for each consumer group in the source cluster. The connector will periodically query the source cluster for all committed offsets from all consumer groups, filter for those topics being replicated, and emit a message to a topic like us-west.checkpoints.internal in the destination cluster. The message will include the following fields:

  • source cluster alias (String)

  • consumer group id (String)

  • topic (String)

  • partition (int)

  • offset (int): latest committed offset in source cluster

  • metadata (String)
  • timestamp

A Checkpoint class exposes these via the following methods:

As with  __consumer_offsets, the checkpoint topic is log-compacted to reflect only the latest offsets across consumer groups, based on a topic-partition-group composite key. The topic will be created automatically by the connector if it doesn't exist.

The heartbeat and checkpoint records are encoded using Kafka's internal serdes, as with __consumer_offsets. A MessageFormatter will be supplied for use with ConsoleConsumer etc.

Relevant classes below.

Code Block
public class Checkpoint {
    public static final String TOPIC_KEY = "topic";
    public static final String PARTITION_KEY = "partition";
    public static final String OFFSET_KEY = "offset";
    public static final String METADATA_KEY = "metadata";
    public static final String CONSUMER_GROUP_ID_KEY = "group";
    public static final String CONSUMER_OFFSETS_KEY = "offsets";
    public static final String SOURCE_CLUSTER_ALIAS_KEY = "cluster";
    public static final String TIMESTAMP_KEY = "timestamp";
    public static final Schema SCHEMA = new Schema(
--->%---
    public String sourceClusterAlias() ...
    public String consumerGroupId() ...
    public TopicPartition sourceTopicPartition() ...
    public OffsetAndMetadata sourceOffsetAndMetadata() ...
}


--->%---

public class Heartbeat {
--->%---
Code Block
public class Checkpoint {
    public static final String TOPIC_KEY = "topic";
    public static final String PARTITION_KEY = "partition";
    public static final String OFFSET_KEY = "offset";
    public static final String METADATA_KEY = "metadata";
    public static final String CONSUMER_GROUP_ID_KEYSchema SCHEMA = "group"; new Schema(
--->%---
    public static final String CONSUMER_OFFSETS_KEY = "offsets";sourceClusterAlias() ...
    public static final String SOURCE_CLUSTER_ALIAS_KEY = "cluster";
    public static final String TIMESTAMP_KEY = "timestamp";
    public static final Schema SCHEMA = new Schema(
--->%---
    public String sourceClusterAlias() ...
    public String consumerGroupId() ...
    public TopicPartition sourceTopicPartition() ...
    public OffsetAndMetadata sourceOffsetAndMetadata() ...
}


--->%---

public class Heartbeat {
--->%---
    public static final Schema SCHEMA = new Schema(
--->%---
    public String sourceClusterAlias() ...
    public String connectorName() ...
    public long timestamp() ...
}

Remote Cluster Utils

A utility class RemoteClusterUtils will leverage the internal topics described above to assist in computing reachability, inter-cluster lag, and offset translation. It won't be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.

The interface is as follows (subject to change):

Code Block
// Calculates the number of hops between a client and an upstream Kafka cluster based on replication heartbeats.
// If the given cluster is not reachable, returns -1.
int replicationHops(AdminClient client, String upstreamClusterAlias)

// Find all heartbeat topics, e.g. A.B.heartbeat, visible from the client.
List<String> heartbeatTopics(AdminClient client)

// Find all checkpoint topics, e.g. A.checkpoint.internal, visible from the client.
List<String> checkpointTopics(AdminClient client)

// Find all upstream clusters reachable from the client
List<String> upstreamClusters(AdminClient client)

// Construct a consumer that is subscribed to all heartbeat topics.
KafkaConsumer<Heartbeat> heartbeatConsumer(Map<?, ?> consumerConfig)

// Construct a consumer that is subscribed to all checkpoint topics.
KafkaConsumer<Checkpoint> checkpointConsumer(Map<?, ?> consumerConfig)

// Find the local offsets corresponding to the latest checkpoint from a specific upstream consumer group.
Map<TopicPartition, Long> translateOffsets(Map<?, ?> targetConsumerConfig, String sourceClusterAlias, String targetClusterAlias, String remoteGroupId)

Replication Policies

ReplicationPolicy classes specify the behavior of MM2 at a high level, including which topics to replicate. The interface is as follows:

connectorName() ...
    public long timestamp() ...
}

Remote Cluster Utils

A utility class RemoteClusterUtils will leverage the internal topics described above to assist in computing reachability, inter-cluster lag, and offset translation. It won't be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.

The interface is as follows:

Code Block
// Calculates the number of hops between a client and an upstream Kafka cluster based on replication heartbeats.
// If the given cluster is not reachable, returns -1.
int replicationHops(AdminClient client, String upstreamClusterAlias)

// Find all heartbeat topics, e.g. A.B.heartbeat, visible from the client.
List<String> heartbeatTopics(AdminClient client)

// Find all checkpoint topics, e.g. A.checkpoint.internal, visible from the client.
List<String> checkpointTopics(AdminClient client)

// Find all upstream clusters reachable from the client
List<String> upstreamClusters(AdminClient client)

// Construct a consumer that is subscribed to all heartbeat topics.
KafkaConsumer<Heartbeat> heartbeatConsumer(Map<?, ?> consumerConfig)

// Construct a consumer that is subscribed to all checkpoint topics.
KafkaConsumer<Checkpoint> checkpointConsumer(Map<?, ?> consumerConfig)

// Find the local offsets corresponding to the latest checkpoint from a specific upstream consumer group.
Map<TopicPartition, Long> translateOffsets(Map<?, ?> targetConsumerConfig, String sourceClusterAlias, String targetClusterAlias, String remoteGroupId)


The utility class assumes DefaultReplicationPolicy is used for replication.

Replication Policies

ReplicationPolicy classes specify the behavior of MM2 at a high level, including which topics to replicate. The interface is as follows:

Code Block
public interface ReplicationPolicy {


    // Whether to replicate the given topic.
	boolean shouldReplicateTopic(String topic);


    // Whether checkpoints should be emitted for the given topic.
    // If false, migrated consumers will lose offsets for this topic.
	boolean shouldCheckpointTopic(String topic);


    // Whether checkpoints should be emitted for the given consumer group.
    // If false, the consumer group will lose all offsets when migrated.
	boolean shouldReplicateConsumerGroup(String groupId);


    // Whether remote topic configurations should be updated to match this source topic's configuration.
	boolean shouldSyncTopicConfiguration(String topic);


    // Whether remote topic ACLs should be updated with this principal.
	boolean shouldReplicatePrincipalACL(String principal);


    // Whether remote topic ACLs should be updated to match this source topic's ACL.
Code Block
public interface ReplicationPolicy {
	boolean shouldReplicateTopic(String topic);
	boolean shouldCheckpointTopic(String topic);
	boolean shouldReplicateConsumerGroup(String groupId);
	boolean shouldSyncTopicConfiguration(String topic);
	boolean shouldReplicatePrincipalACL(String principal);
	boolean shouldSyncTopicACL(String topic);
	String formatRemoteTopic(String topic);

    // e.g. us-west.topic1
	String formatRemoteTopic(String topic);
} 

The DefaultReplicationPolicy honors whitelists and blacklists for topics and consumer groups via regex patterns. LegacyReplicationPolicy causes MM2 to mimic legacy MirrorMaker behavior.

...

Properties common to the SourceConnector(s) and SinkConnector:

property

default value

description

namerequiredname of the connector, e.g. "us-west->us-east"

topics

empty string

regex of topics to replicate, e.g. "topic1|topic2|topic3"


topics.blacklist

[".*\.internal", ".*\.replica", "__consumer_offsets", …]

topics to exclude from replication

groupsempty stringregex of groups to replicate, e.g. ".*"

source.cluster.alias

required

name of the cluster being replicated

target.cluster.aliasrequiredname of the downstream Kafka cluster
source.cluster.bootstrap.serversrequiredupstream cluster to replicate
target.cluster.bootstrap.serversrequireddownstream cluster

sync.topic.configs

true

whether or not to monitor source cluster for configuration changes

sync.topic.aclstruewhether to monitor source cluster ACLs for changes

admin.*

n/a

passed to AdminClient for local cluster

remote.admin.*

n/a

passed to remote AdminClient

rename.topicstrueconnector should rename downstream topics according to "remote topic" convention

emit.heartbeats

true

connector should periodically emit heartbeats

emit.heartbeats.interval.seconds5 (seconds)frequency of heartbeats

emit.checkpoints

true

connector should periodically emit consumer offset information

emit.checkpoints.interval.seconds5 (seconds)frequency of checkpoints
refresh.topicstrueconnector should periodically check for new topics
refresh.topics.interval.seconds5 (seconds)frequency to check source cluster for new topics
refresh.groupstrueconnector should periodically check for new consumer groups
refresh.groups.interval.seconds5 (seconds)frequency to check source cluster for new consumer groups
readahead.queue.capacity500 (records)number of records to let consumer get ahead of producer
replication.policy.classorg.apache.kafka.connect.mirror.DefaultReplicationPolicyuse LegacyReplicationPolicy to mimic legacy MirrorMaker
heartbeat.retention.ms1 dayused when creating heartbeat topics for the first time
checkpoint.retention.ms1 dayused when creating checkpoint topics for the first time

The following are the SourceConnector's additional configuration properties.

property

default value

description

consumer.*

n/a

passed to the consumer

consumer.poll.timeout.ms500 (ms)poll timeout

And similarly for the SinkConnector:

property

default value

description

producer.*

n/a

passed to the producer

These properties are specified in the MirrorConnectorConfig class, which is shared by all MirrorMaker connectors (MirrorSourceConnector, MirrorCheckpointConnector, etc). Some properties are relevant to specific Connectors, e.g. "emit.heartbeats" is only relevant to the MirrorHeartbeatConnector.

...

The high-level configuration file required by the MirrorMaker driver supports the following properties:

property

default value

description

clusters

required

comma-separated list of Kafka cluster "aliases"


cluster.cluster.broker.list

required

connection information for the specific cluster


cluster.cluster.x.y.zn/apassed to workers for a specific cluster
connector.source->target.x.y.zn/apassed to a specific connector


Walkthrough: Running MirrorMaker 2.0

...

Code Block
PUT /connectors/us-west-source/config HTTP/1.1

{
    "name": "us-west-source",
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "us-west",
    "connectortarget.cluster.classalias": "org.apache.kafka.connect.mirror.MirrorSourceConnectorus-east",
    "source.cluster.brokers.aliaslist": "us-west",
    "target.cluster.alias": "us-east",
    "source.cluster.brokers.list": "us-west-host1:9091",
    "topics": ".*"
}

Running MirrorMaker in legacy mode

After legacy MirrorMaker is deprecated, the existing ./bin/kafka-mirror-maker.sh scripts will be updated to run MM2 in legacy mode:

$ ./bin/kafka-mirror-maker.sh --consumer consumer.properties --producer producer.properties

Compatibility, Deprecation, and Migration Plan

A new version of ./bin/kafka-mirror-maker.sh will be implemented to run MM2 in "legacy mode", i.e. with new features disabled and supporting existing options and configuration properties. Existing deployments will be able to upgrade to MM2 seamlessly.

The existing MirrorMaker source will be removed from Kafka core project. MM2 will be added to the connect project under a new module "mirror" and package "org.apache.kafka.connect.mirror".

Deprecation will occur in three phases:

...

-host1:9091",
    "topics": ".*"
}

Running MirrorMaker in legacy mode

After legacy MirrorMaker is deprecated, the existing ./bin/kafka-mirror-maker.sh scripts will be updated to run MM2 in legacy mode:

$ ./bin/kafka-mirror-maker.sh --consumer consumer.properties --producer producer.properties


Compatibility, Deprecation, and Migration Plan

A new version of ./bin/kafka-mirror-maker.sh will be implemented to run MM2 in "legacy mode", i.e. with new features disabled and supporting existing options and configuration properties. Existing deployments will be able to upgrade to MM2 seamlessly.

The existing MirrorMaker source will be removed from Kafka core project. MM2 will be added to the connect project under a new module "mirror" and package "org.apache.kafka.connect.mirror".

Deprecation will occur in three phases:

  • Phase 1 (targeting next Apache Kafka release): All MirrorMaker 2.0 Java code is added to ./connect/mirror/.
  • Phase 2 (subsequent release): Legacy MirrorMaker Scala code is deprecated, but kept in place. Sample MM2 scripts and configuration files are added to ./bin/ and ./config/.
  • Phase 3 (subsequent release): Legacy MirrorMaker Scala code is removed from Apache Kafka. A new ./bin/kafka-mirror-maker.sh script is provided which replaces and emulates the legacy script.

Future Work

  • Command-line tools wrapping RemoteClusterUtils. Document cross-cluster migration procedure.
  • Broker and KafkaConsumer support for unambiguous remote topic names, e.g. with a reserved separator character.
  • CheckpointSinkConnector/Task to support upstream checkpointing when a single Connect cluster is used.
  • Additional metrics.
  • Support reconfiguration of running Tasks without stop, start.
  • Exactly-once semantics in Connect and MM2.
  • Improve offset translation, e.g. with timestamps or RecordBatch headers, to get closer to exact offset of latest commit.
  • Prevent duplicate streams from "diamond problem" (fan-out-fan-in).
  • REST API for start, stop, status of underlying Connectors.
  • Support multiple clusters/workers/herders in ConnectDistributed to obviate MM2-specific driver.
  • KIP-416: Notify SourceTask of ACK'd offsets, metadata

...

Rejected Alternatives

  • We could release this as an independent project, but we feel that cluster replication should be one of Kafka's fundamental features.

  • We could deprecate MirrorMaker but keep it around. However, we see this as a natural evolution of MirrorMaker, not an alternative solution.

  • We could update MirrorMaker rather than completely rewrite it. However, we'd end up recreating many features provided by the Connect framework, including the REST API, configuration, metrics, worker coordination, etc.

  • We could build on Uber's uReplicator, which solves some of the problems with MirrorMaker. However, uReplicator uses Apache Helix to provide features that overlap with Connect, e.g. REST API, live configuration changes, cluster management, coordination etc. A native MirrorMaker should only use native parts of Apache Kafka. That said, uReplicator is a major inspiration for the MM2 design.

  • We could provide a high-level REST API for remote-controlling a MirrorMaker cluster. However, this has overlapping concerns with Connect.
  • Instead of separate connectors for heartbeats and checkpoints, we could do everything in a single connector. However, this restricts how Converters and Transformations can be used. For example, you might want a replication pipeline that converts to JSON, but that doesn't necessarily mean you also want heartbeats to be JSON-encoded. It's possible to create a separate KafkaProducer within MirrorSourceConnector to emit Heartbeats and Checkpoints without going through the Connect transformation pipeline, but I don't want to introduce the additional configuration properties for a whole new producer.

...