Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Further Improvements and Proposals on Mirror Maker 2

Accepted

Under Discussion

Abandoned

Motivation

MirrorMaker has been used for years in large-scale production environments, but not without several problems:

...

  • Leverages the Kafka Connect framework and ecosystem.

  • Includes both source and sink connectors.

  • Includes a high-level driver that manages connectors in a dedicated cluster.
  • Detects new topics, partitions.
  • Automatically syncs topic configuration between clusters.

  • Manages downstream topic ACL.

  • Supports "active/active" cluster pairs, as well as any number of active clusters.

  • Supports cross-datacenter replication, aggregation, and other complex topologies.

  • Provides new metrics including end-to-end replication latency across multiple data centers/clusters.

  • Emits offsets required to migrate consumers between clusters.

  • Tooling for offset translation.

  • MirrorMaker-compatible legacy mode.

  • No rebalancing.

Public Interfaces

Public Interfaces

This KIP subsumes the small interface change proposed in KIP-416: Notify SourceTask of ACK'd offsets, metadata

New classes and interfaces include:

...

property

default value

description

namerequiredname of the connector, e.g. "us-west->us-east"

topics

empty string

regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.


topics.blacklist

[".*\.internal", " .*\.replica", " __consumer_offsets" , …]or similar

topics to exclude from replication

groupsempty stringregex of groups to replicate, e.g. ".*"
groups.blacklistempty stringgroups to exclude from replication

source.cluster.alias

required

name of the cluster being replicated

target.cluster.aliasrequiredname of the downstream Kafka cluster
source.cluster.bootstrap.serversrequiredupstream cluster to replicate
target.cluster.bootstrap.serversrequireddownstream cluster

sync.topic.configs.enabled

true

whether or not to monitor source cluster for configuration changes

sync.topic.aclstruewhether to monitor source cluster ACLs for changes

admin.*

n/a

passed to AdminClient for local cluster

remote.admin.*

n/a

passed to remote AdminClient

rename.topicstrueconnector should rename downstream topics according to "remote topic" convention

changes

sync.topic.acls.enabledtruewhether to monitor source cluster ACLs for changes

emit.heartbeats.enabledemit.heartbeats

true

connector should periodically emit heartbeats

emit.heartbeats.interval.seconds5 (seconds)frequency of heartbeats

emit.checkpoints.enabled

true

connector should periodically emit consumer offset information

emit.checkpoints.interval.seconds5 (seconds)frequency of checkpoints
refresh.topics.enabledtrueconnector should periodically check for new topics
refresh.topics.interval.seconds5 (seconds)frequency to check source cluster for new topics
refresh.groups.enabledtrueconnector should periodically check for new consumer groups
refresh.groups.interval.seconds5 (seconds)frequency to check source cluster for new consumer groups
readahead.queue.capacity500 (records)number of records to let consumer get ahead of producer
replication.policy.classorg.apache.kafka.connect.mirror.DefaultReplicationPolicyuse LegacyReplicationPolicy to mimic legacy MirrorMaker
heartbeat.retention.ms1 dayused when creating heartbeat topics for the first time
checkpoint.retention.ms1 dayused when creating checkpoint topics for the first time

The following are the SourceConnector's additional configuration properties.

...

property

...

default value

...

description

...

consumer.*

...

n/a

...

passed to the consumer

...

consumer get ahead of producer
replication.policy.classorg.apache.kafka.connect.mirror.DefaultReplicationPolicyuse LegacyReplicationPolicy to mimic legacy MirrorMaker
heartbeats.topic.retention.ms1 dayused when creating heartbeat topics for the first time
checkpoints.topic.retention.ms1 dayused when creating checkpoint topics for the first time
offset.syncs.topic.retention.msmax longused when creating offset sync topic for the first time
replication.factor2used when creating remote topics

In addition, internal client properties can be overridden as follows:

property

description

source.cluster.consumer.*

overrides for the source-cluster consumer

source.cluster.producer.*overrides for the source-cluster producer
source.cluster.admin.*overrides for the source-cluster admin

target.cluster.consumer.*

overrides for the target-cluster consumer

target.cluster.producer.*overrides for the target-cluster producer
target.cluster.admin.*overrides for the target-cluster admin

And similarly for the SinkConnector:

passed to the producer

property

default value

description

producer.*

n/a

These properties are specified in the MirrorConnectorConfig class, which is shared by all MirrorMaker connectors (MirrorSourceConnector, MirrorCheckpointConnector, etc). Some properties are relevant to specific Connectors, e.g. "emit.heartbeats" is only relevant to the MirrorHeartbeatConnector.

Example Configuration

A sample configuration file ./config/connect-mirror-source.properties is provided:

...

Code Block
--%<----


# emulate legacy mirror-maker
replication.policy.class = org.apache.kafka.mirror.LegacyReplicationPolicy


# alternatively,disable canall disablenew individual features
renamerefresh.topics = false
refresh.topics.enabled = false
refresh.groups.enabled = false
emit.checkpoints.enables = false
emit.heartbeats.enabled = false
sync.topic.configs.enabled = false
sync.topic.acls.enabled = false


--%<----

MirrorMaker Clusters

...

Code Block
clusters = primary, backup
cluster.primary.brokerbootstrap.listservers = localhost:9091
cluster.backup.brokerbootstrap.listservers = localhost:9092 

This can be run as follows:

...

Code Block
clusters = primary, backup
cluster.primary.bootstrap.servers = localhost:9091
cluster.backup.bootstrap.servers = localhost:9092 
connector.primary->backup.topics = .*
connector.primary->backup.emit.heartbeats.enabled = false

In this case, two MirrorSourceConnectors, two MirrorCheckpointConnectors, and two MirrorHeartbeatConnectors will be created by the driver and the primary→backup connectors will start replicating all source topics at launch, with heartbeats disabled. Sub-configurations like connector. primary->backup.x.y.z will be applied to all connectors in that source→target flow (MirrorSourceConnector, MirrorCheckpointConnector), avoiding the need to configure each connector individually.

...

property

default value

description

clusters

required

comma-separated list of Kafka cluster "aliases"


cluster.cluster.bootstrap.servers

required

connection information for the specific cluster


cluster.cluster.x.y.zn/apassed to workers for a specific cluster
connector.source->target.x.y.zn/apassed to a specific connector

...

Code Block
# mm2.properties
clusters = us-west, us-east
cluster.us-west.bootstrap.servers = host1:9092
cluster.us-east.bootstrap.servers = host2:9092

...

You can also override default properties for specific clusters or connectors:

Code Block
cluster.us-west.offset.storage.topic = mm2-offsets
connector.us-west->us-east.emit.heartbeats.enabled = false

Second, launch one or more MirrorMaker cluster nodes:

...

  • MirrorSourceConnector
  • MirrorSinkConnector (coming soon)
  • MirrorCheckpointConnector
  • MirrorHeartbeatConnector

...

  • Command-line tools wrapping RemoteClusterUtils. Document cross-cluster migration procedure.
  • Broker and KafkaConsumer support for unambiguous remote topic names, e.g. with a reserved separator character.
  • CheckpointSinkConnector/Task to support upstream checkpointing when a single Connect cluster is used.
  • Additional metrics.
  • Support reconfiguration of running Tasks without stop, start.
  • Exactly-once semantics in Connect and MM2.
  • Improve offset translation, e.g. with timestamps or RecordBatch headers, to get closer to exact offset of latest commit.
  • Prevent duplicate streams from "diamond problem" (fan-out-fan-in).
  • REST API for start, stop, status of underlying Connectors.
  • Support multiple clusters/workers/herders in ConnectDistributed to obviate MM2-specific driver.KIP-416: Notify SourceTask of ACK'd offsets, metadata

Rejected Alternatives

  • We could release this as an independent project, but we feel that cluster replication should be one of Kafka's fundamental features.

  • We could deprecate MirrorMaker but keep it around. However, we see this as a natural evolution of MirrorMaker, not an alternative solution.

  • We could update MirrorMaker rather than completely rewrite it. However, we'd end up recreating many features provided by the Connect framework, including the REST API, configuration, metrics, worker coordination, etc.

  • We could build on Uber's uReplicator, which solves some of the problems with MirrorMaker. However, uReplicator uses Apache Helix to provide features that overlap with Connect, e.g. REST API, live configuration changes, cluster management, coordination etc. A native MirrorMaker should only use native parts of Apache Kafka. That said, uReplicator is a major inspiration for the MM2 design.

  • We could provide a high-level REST API for remote-controlling a MirrorMaker cluster. However, this has overlapping concerns with Connect.
  • Instead of separate connectors for heartbeats and checkpoints, we could do everything in a single connector. However, this restricts how Converters and Transformations can be used. For example, you might want a replication pipeline that converts to JSON, but that doesn't necessarily mean you also want heartbeats to be JSON-encoded. It's possible to create a separate KafkaProducer within MirrorSourceConnector to emit Heartbeats and Checkpoints without going through the Connect transformation pipeline, but I don't want to introduce the additional configuration properties for a whole new producer.

...