...
property | default value | description |
name | required | name of the connector, e.g. "us-west->us-east" |
topics | empty string | regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported. |
topics.blacklist | [".*\.internal", " .*\.replica", " __consumer_offsets" , …]or similar | topics to exclude from replication |
groups | empty string | regex of groups to replicate, e.g. ".*" |
groups.blacklist | empty string | groups to exclude from replication |
source.cluster.alias | required | name of the cluster being replicated |
target.cluster.alias | required | name of the downstream Kafka cluster |
source.cluster.bootstrap.servers | required | upstream cluster to replicate |
target.cluster.bootstrap.servers | required | downstream cluster |
sync.topic.configs.enabled | true | whether or not to monitor source cluster for configuration changes |
sync.topic.acls.enabled | true | whether to monitor source cluster ACLs for changes |
admin.* | n/a | passed to AdminClient for local cluster |
remote.admin.* | n/a | passed to remote AdminClient |
rename.topics | true | connector should rename downstream topics according to "remote topic" convention |
emit.heartbeats.enabled | true | connector should periodically emit heartbeats |
emit.heartbeats.interval.seconds | 5 (seconds) | frequency of heartbeats |
emit.checkpoints.enabled | true | connector should periodically emit consumer offset information |
emit.checkpoints.interval.seconds | 5 (seconds) | frequency of checkpoints |
refresh.topics.enabled | true | connector should periodically check for new topics |
refresh.topics.interval.seconds | 5 (seconds) | frequency to check source cluster for new topics |
refresh.groups.enabled | true | connector should periodically check for new consumer groups |
refresh.groups.interval.seconds | 5 (seconds) | frequency to check source cluster for new consumer groups |
readahead.queue.capacity | 500 (records) | number of records to let consumer get ahead of producer |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | use LegacyReplicationPolicy to mimic legacy MirrorMaker |
heartbeats.topic.retention.ms | 1 day | used when creating heartbeat topics for the first time |
checkpoints.topic.retention.ms | 1 day | used when creating checkpoint topics for the first time |
offset.syncs.topic.retention.ms | max long | used when creating offset sync topic for the first time |
replication.factor | 12 | used when creating remote topics |
The following are the SourceConnector's additional configuration properties.In addition, internal client properties can be overridden as follows:
property | default value | description |
source.cluster.consumer.* | n/a | passed to the overrides for the source-cluster consumer |
consumersource.pollcluster.timeoutproducer.ms | 500 (ms) | poll timeout |
And similarly for the SinkConnector:
* | overrides for the source-cluster producer | |
source.cluster.admin.* | overrides for the source-cluster admin | |
target.cluster.consumer.* | overrides for the target-cluster consumer | |
target.cluster.producer.* | overrides for the target-cluster producer | |
target.cluster.admin.* | overrides for the target-cluster admin | |
property | default value | description |
producer.* | n/a | passed to the producer
These properties are specified in the MirrorConnectorConfig class, which is shared by all MirrorMaker connectors (MirrorSourceConnector, MirrorCheckpointConnector, etc). Some properties are relevant to specific Connectors, e.g. "emit.heartbeats" is only relevant to the MirrorHeartbeatConnector.
Example Configuration
A sample configuration file ./config/connect-mirror-source.properties is provided:
Code Block |
---|
name = local-mirror-source topics = .* connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector tasks.max = 1 # for demo, source and target clusters are the same source.cluster.alias = upstream source.cluster.bootstrap.servers = localhost:9092 target.cluster.bootstrap.servers = localhost:9092 # use ByteArrayConverter to ensure that records are not re-encoded key.converter = org.apache.kafka.connect.converters.ByteArrayConverter value.converter = org.apache.kafka.connect.converters.ByteArrayConverter |
...
Code Block |
---|
--%<---- # emulate legacy mirror-maker replication.policy.class = org.apache.kafka.mirror.LegacyReplicationPolicy # disable all new features renamerefresh.topics = false refresh.topics.enabled = false refresh.groups.enabled = false emit.checkpoints.enables = false emit.heartbeats.enabled = false sync.topic.configs.enabled = false sync.topic.acls.enabled = false --%<---- |
MirrorMaker Clusters
...
Code Block |
---|
clusters = primary, backup cluster.primary.brokerbootstrap.listservers = localhost:9091 cluster.backup.brokerbootstrap.listservers = localhost:9092 |
This can be run as follows:
...
Code Block |
---|
name = MirrorSourceConnector connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector source.cluster.alias = primary target.cluster.alias = backup source.cluster.bootstrap.servers = localhost:9091 target.cluster.bootstrap.servers = localhost:9092 key.converter.class = org.apache.kafka.connect.converters.ByteArrayConverter value.converter.class = org.apache.kafka.connect.converters.ByteArrayConverter |
...
Code Block |
---|
clusters = primary, backup
primary.bootstrap.servers = localhost:9091
backup.bootstrap.servers = localhost:9092
primary->backup.topics = .*
primary->backup.emit.heartbeats.enabled = false |
In this case, two MirrorSourceConnectors, two MirrorCheckpointConnectors, and two MirrorHeartbeatConnectors will be created by the driver and the primary→backup connectors will start replicating all source topics at launch, with heartbeats disabled. Sub-configurations like primary->backup.x.y.z will be applied to all connectors in that source→target flow (MirrorSourceConnector, MirrorCheckpointConnector), avoiding the need to configure each connector individually.
...
Code Block |
---|
us-west.offset.storage.topic = mm2-offsets
us-west->us-east.emit.heartbeats.enabled = false
|
Second, launch one or more MirrorMaker cluster nodes:
...
Code Block |
---|
# connector.properties name = local-mirror-source connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector tasks.max = 1 topics=.* source.cluster.alias = upstream source.cluster.bootstrap.servers = host1:9092 target.cluster.bootstrap.servers = host2:9092 # use ByteArrayConverter to ensure that in and out records are exactly the same key.converter = org.apache.kafka.connect.converters.ByteArrayConverter value.converter = org.apache.kafka.connect.converters.ByteArrayConverter |
...
- MirrorSourceConnector
- MirrorSinkConnector (coming soon)
- MirrorCheckpointConnector
- MirrorHeartbeatConnector
...
Code Block |
---|
PUT /connectors/us-west-source/config HTTP/1.1
{
"name": "us-west-source",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "us-west",
"target.cluster.alias": "us-east",
"source.cluster.bootstrap.servers": "us-west-host1:9091",
"topics": ".*"
}
|
...
- Command-line tools wrapping RemoteClusterUtils. Document cross-cluster migration procedure.
- Broker and KafkaConsumer support for unambiguous remote topic names, e.g. with a reserved separator character.
- CheckpointSinkConnector/Task to support upstream checkpointing when a single Connect cluster is used.
- Additional metrics.
- Support reconfiguration of running Tasks without stop, start.
- Exactly-once semantics in Connect and MM2.
- Prevent duplicate streams from "diamond problem" (fan-out-fan-in).
- Support multiple clusters/workers/herders in ConnectDistributed to obviate MM2-specific driver.KIP-416: Notify SourceTask of ACK'd offsets, metadata
Rejected Alternatives
We could release this as an independent project, but we feel that cluster replication should be one of Kafka's fundamental features.
We could deprecate MirrorMaker but keep it around. However, we see this as a natural evolution of MirrorMaker, not an alternative solution.
We could update MirrorMaker rather than completely rewrite it. However, we'd end up recreating many features provided by the Connect framework, including the REST API, configuration, metrics, worker coordination, etc.
We could build on Uber's uReplicator, which solves some of the problems with MirrorMaker. However, uReplicator uses Apache Helix to provide features that overlap with Connect, e.g. REST API, live configuration changes, cluster management, coordination etc. A native MirrorMaker should only use native parts of Apache Kafka. That said, uReplicator is a major inspiration for the MM2 design.
- We could provide a high-level REST API for remote-controlling a MirrorMaker cluster. However, this has overlapping concerns with Connect.
Instead of separate connectors for heartbeats and checkpoints, we could do everything in a single connector. However, this restricts how Converters and Transformations can be used. For example, you might want a replication pipeline that converts to JSON, but that doesn't necessarily mean you also want heartbeats to be JSON-encoded. It's possible to create a separate KafkaProducer within MirrorSourceConnector to emit Heartbeats and Checkpoints without going through the Connect transformation pipeline, but I don't want to introduce the additional configuration properties for a whole new producer.
...