...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
MirrorMaker has been used for years in large-scale production environments, but not without several problems:
Topics are created with default configuration. Often they'll need to be repartitioned manually.
ACL and configuration changes are not synced across mirrored clusters, making it difficult to manage multiple clusters.
Records are repartitioned with DefaultPartitioner. Semantic partitioning may be lost.
Any configuration change means the cluster must be bounced. This includes adding new topics to the whitelist, which may be a frequent operation.
There's no mechanism to migrate producers or consumers between mirrored clusters.
There's no support for exactly-once delivery. Records may be duplicated during replication.
Clusters can't be made mirrors of each other, i.e. no support for active/active pairs.
- Rebalancing causes latency spikes, which may trigger further rebalances.
For these reasons, MirrorMaker is insufficient for many use cases, including backup, disaster recovery, and fail-over scenarios. Several other Kafka replication tools have been created to address some of these limitations, but Apache Kafka has no adequate replication strategy to date. Moreover, the lack of a native solution makes it difficult to build generic tooling for multi-cluster environments.
I propose to replace MirrorMaker with a new multi-cluster, cross-data-center replication engine based on the Connect framework, MirrorMaker 2.0 (MM2). The new engine will be fundamentally different from the legacy MirrorMaker in many ways, but will provide a drop-in replacement for existing deployments.
Highlights of the design include:
Leverages the Kafka Connect framework and ecosystem.
Includes both source and sink connectors.
- Includes a high-level driver that manages connectors in a dedicated cluster.
- Detects new topics, partitions.
Automatically syncs topic configuration between clusters.
Manages downstream topic ACL.
Supports "active/active" cluster pairs, as well as any number of active clusters.
Supports cross-datacenter replication, aggregation, and other complex topologies.
Provides new metrics including end-to-end replication latency across multiple data centers/clusters.
Emits offsets required to migrate consumers between clusters.
Tooling for offset translation.
MirrorMaker-compatible legacy mode.
- No rebalancing.
Public Interfaces
New classes and interfaces include:
MirrorSourceConnector, MirrorSinkConnector, MirrorSourceTask, MirrorSinkTask classes.
- MirrorCheckpointConnector, MirrorCheckpointTask.
- MirrorHeartbeatConnector, MirrorHeartbeatTask.
- MirrorConnectorConfig, MirrorTaskConfig classes.
ReplicationPolicy interface. DefaultReplicationPolicy and LegacyReplicationPolicy classes.
Heartbeat, checkpoint, offset sync topics and associated schemas.
RemoteClusterUtils and MirrorClient classes for querying remote cluster reachability and lag, and for translating consumer offsets between clusters.
MirrorMaker driver class with main entry point for running MM2 cluster nodes.
- MirrorMakerConfig used by MirrorMaker driver.
- HeartbeatMessageFormatter, CheckpointMessageFormatter
- ./bin/connect-mirror-maker.sh and ./config/mirror-maker.properties sample configuration.
New metrics include:
Further Improvements and Proposals on Mirror Maker 2
Accepted
- KIP-545: support automated consumer offset sync across clusters in MM 2.0
- KIP-597: MirrorMaker2 internal topics Formatters
- KIP-690: Add additional configuration to control MirrorMaker 2 internal topics naming convention
- KIP-710: Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
- KIP-716: Allow configuring the location of the offset-syncs topic with MirrorMak
- KIP-720: Deprecate MirrorMaker v1er2
Under Discussion
Abandoned
- KIP-656: MirrorMaker2 Exactly-once Semantics (superseded by KIP-618: Exactly-Once Support for Source Connectors and KIP-710: Full support for distributed mode in dedicated MirrorMaker 2.0 clusters)
- KIP-712: Shallow Mirroring
Motivation
MirrorMaker has been used for years in large-scale production environments, but not without several problems:
Topics are created with default configuration. Often they'll need to be repartitioned manually.
ACL and configuration changes are not synced across mirrored clusters, making it difficult to manage multiple clusters.
Records are repartitioned with DefaultPartitioner. Semantic partitioning may be lost.
Any configuration change means the cluster must be bounced. This includes adding new topics to the whitelist, which may be a frequent operation.
There's no mechanism to migrate producers or consumers between mirrored clusters.
There's no support for exactly-once delivery. Records may be duplicated during replication.
Clusters can't be made mirrors of each other, i.e. no support for active/active pairs.
- Rebalancing causes latency spikes, which may trigger further rebalances.
For these reasons, MirrorMaker is insufficient for many use cases, including backup, disaster recovery, and fail-over scenarios. Several other Kafka replication tools have been created to address some of these limitations, but Apache Kafka has no adequate replication strategy to date. Moreover, the lack of a native solution makes it difficult to build generic tooling for multi-cluster environments.
I propose to replace MirrorMaker with a new multi-cluster, cross-data-center replication engine based on the Connect framework, MirrorMaker 2.0 (MM2). The new engine will be fundamentally different from the legacy MirrorMaker in many ways, but will provide a drop-in replacement for existing deployments.
Highlights of the design include:
Leverages the Kafka Connect framework and ecosystem.
Includes both source and sink connectors.
- Includes a high-level driver that manages connectors in a dedicated cluster.
- Detects new topics, partitions.
Automatically syncs topic configuration between clusters.
Manages downstream topic ACL.
Supports "active/active" cluster pairs, as well as any number of active clusters.
Supports cross-datacenter replication, aggregation, and other complex topologies.
Provides new metrics including end-to-end replication latency across multiple data centers/clusters.
Emits offsets required to migrate consumers between clusters.
Tooling for offset translation.
MirrorMaker-compatible legacy mode.
- No rebalancing.
Public Interfaces
This KIP subsumes the small interface change proposed in KIP-416: Notify SourceTask of ACK'd offsets, metadata
New classes and interfaces include:
MirrorSourceConnector, MirrorSinkConnector, MirrorSourceTask, MirrorSinkTask classes.
- MirrorCheckpointConnector, MirrorCheckpointTask.
- MirrorHeartbeatConnector, MirrorHeartbeatTask.
- MirrorConnectorConfig, MirrorTaskConfig classes.
ReplicationPolicy interface. DefaultReplicationPolicy and LegacyReplicationPolicy classes.
Heartbeat, checkpoint, offset sync topics and associated schemas.
RemoteClusterUtils and MirrorClient classes for querying remote cluster reachability and lag, and for translating consumer offsets between clusters.
MirrorMaker driver class with main entry point for running MM2 cluster nodes.
- MirrorMakerConfig used by MirrorMaker driver.
- HeartbeatMessageFormatter, CheckpointMessageFormatter
- ./bin/connect-mirror-maker.sh and ./config/mirror-maker.properties sample configuration.
New metrics include:
- replication-latency-ms(-avg/-min/-max): timespan between each record's timestamp and downstream ACK
- record-bytes(-avg/-min/-max
- replication-latency-ms(-avg/-min/-max): timespan between each record's timestamp and downstream ACK
- record-bytes(-avg/-min/-max): size of each record being replicated
- record-age-ms(-avg/-min/-max): age of each record when consumed
- record-count: number of records replicated
- checkpoint-latency-ms(-avg/-min/-max): timestamp between consumer group commit and downstream checkpoint ACK
...
A utility class RemoteClusterUtils will leverage the internal topics described above to assist in computing reachability, inter-cluster lag, and offset translation. It won't be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.
The interface is as follows:
Code Block |
---|
// Calculates the number of hops between a client and an upstream Kafka cluster based on replication heartbeats. // If the given cluster is not reachable, returns -1. int replicationHops(Map<String, Object> properties, String upstreamClusterAlias) // Find all heartbeat topics, e.g. A.B.heartbeat, visible from the client. Set<String> heartbeatTopics(Map<String, Object> properties) // Find all checkpoint topics, e.g. A.checkpoint.internal, visible from the client. Set<String> checkpointTopics(Map<String, Object> properties) // Find all upstream clusters reachable from the client Set<String> upstreamClusters( |
...
The interface is as follows:
Code Block |
---|
Map<String, Object> properties) // CalculatesFind the local offsets numbercorresponding ofto hopsthe betweenlatest acheckpoint clientfrom anda anspecific upstream Kafka cluster based on replication heartbeats. // If the given cluster is not reachable, returns -1. int replicationHopsconsumer group. Map<TopicPartition, OffsetAndMetadata> translateOffsets(Map<String, Object> properties, String targetClusterAlias, String consumerGroupId, Duration timeout) |
The utility class assumes DefaultReplicationPolicy is used for replication.
MirrorClient
The RemoteClusterUtils class wraps a low-level MirrorClient, with the following exposed methods:
Code Block |
---|
MirrorClient(Map<String, Object> properties, String upstreamClusterAlias) // Find all heartbeat topics, e.g. A.B.heartbeat, visible from the client. Set<String> heartbeatTopics(Map<String, Object> properties) // Find all checkpoint topics, e.g. A.checkpoint.internal, visible from the client. Set<String> checkpointTopics(Map<String, Object> properties)props) ... void close() ... ReplicationPolicy replicationPolicy() ... int replicationHops(String upstreamClusterAlias) ... Set<String> heartbeatTopics() ... Set<String> checkpointTopics() ... // Finds upstream clusters, which may be multiple hops away, based on incoming heartbeats. Set<String> upstreamClusters() ... // Find all upstreamremote clusterstopics reachable fromon the clientcluster Set<String> upstreamClustersremoteTopics(Map<String, Object> properties)) ... // ConstructFind aall consumerremote thattopics isfrom subscribedthe to all heartbeat topics.given source Set<String> heartbeatTopicsremoteTopics(Map<String,String Object> properties)source) ... // Find the local offsets corresponding to the latest checkpoint from a specific upstream consumer group. Map<TopicPartition, OffsetAndMetadata> translateOffsetsremoteConsumerOffsets(Map<String, Object> propertiesString consumerGroupId, String targetClusterAliasremoteClusterAlias, String consumerGroupId, Duration timeout) |
...
...
|
Replication Policies and Filters
...
property | default value | description |
name | required | name of the connector, e.g. "us-west->us-east" |
topics | empty string | regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported. |
topics.blacklist | [".*\.internal", " .*\.replica", " __consumer_offsets" , …]or similar | topics to exclude from replication |
groups | empty string | regex of groups to replicate, e.g. ".*" |
groups.blacklist | empty string | groups to exclude from replication |
source.cluster.alias | required | name of the cluster being replicated |
target.cluster.alias | required | name of the downstream Kafka cluster |
source.cluster.bootstrap.servers | required | upstream cluster to replicate |
target.cluster.bootstrap.servers | required | downstream cluster |
sync.topic.configs.enabled | true | whether or not to monitor source cluster for configuration changes |
sync.topic.acls | true | whether to monitor source cluster ACLs for changes |
admin.* | n/a | passed to AdminClient for local cluster |
remote.admin.* | n/a | passed to remote AdminClient |
rename.topics | true | connector should rename downstream topics according to "remote topic" convention |
changes | ||
sync.topic.acls.enabled | true | whether to monitor source cluster ACLs for changes |
emit.heartbeats.enabledemit.heartbeats | true | connector should periodically emit heartbeats |
emit.heartbeats.interval.seconds | 5 (seconds) | frequency of heartbeats |
emit.checkpoints.enabled | true | connector should periodically emit consumer offset information |
emit.checkpoints.interval.seconds | 5 (seconds) | frequency of checkpoints |
refresh.topics.enabled | true | connector should periodically check for new topics |
refresh.topics.interval.seconds | 5 (seconds) | frequency to check source cluster for new topics |
refresh.groups.enabled | true | connector should periodically check for new consumer groups |
refresh.groups.interval.seconds | 5 (seconds) | frequency to check source cluster for new consumer groups |
readahead.queue.capacity | 500 (records) | number of records to let consumer get ahead of producer |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | use LegacyReplicationPolicy to mimic legacy MirrorMaker |
heartbeat.retention.ms | 1 day | used when creating heartbeat topics for the first time |
checkpoint.retention.ms | 1 day | used when creating checkpoint topics for the first time |
The following are the SourceConnector's additional configuration properties.
...
property
...
default value
...
description
...
consumer.*
...
n/a
...
passed to the consumer
...
ahead of producer | ||
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | use LegacyReplicationPolicy to mimic legacy MirrorMaker |
heartbeats.topic.retention.ms | 1 day | used when creating heartbeat topics for the first time |
checkpoints.topic.retention.ms | 1 day | used when creating checkpoint topics for the first time |
offset.syncs.topic.retention.ms | max long | used when creating offset sync topic for the first time |
replication.factor | 2 | used when creating remote topics |
In addition, internal client properties can be overridden as follows:
property | description |
source.cluster.consumer.* | overrides for the source-cluster consumer |
source.cluster.producer.* | overrides for the source-cluster producer |
source.cluster.admin.* | overrides for the source-cluster admin |
target.cluster.consumer.* | overrides for the target-cluster consumer |
target.cluster.producer.* | overrides for the target-cluster producer |
target.cluster.admin.* | overrides for the target-cluster admin |
And similarly for the SinkConnector:
property | default value | description |
producer.* | n/a | passed to the producer
These properties are specified in the MirrorConnectorConfig class, which is shared by all MirrorMaker connectors (MirrorSourceConnector, MirrorCheckpointConnector, etc). Some properties are relevant to specific Connectors, e.g. "emit.heartbeats" is only relevant to the MirrorHeartbeatConnector.
Example Configuration
A sample configuration file ./config/connect-mirror-source.properties is provided:
...
Code Block |
---|
--%<---- # emulate legacy mirror-maker replication.policy.class = org.apache.kafka.mirror.LegacyReplicationPolicy # alternatively,disable canall disablenew individual features renamerefresh.topics = false refresh.topics.enabled = false refresh.groups.enabled = false emit.checkpoints.enables = false emit.heartbeats.enabled = false sync.topic.configs.enabled = false sync.topic.acls.enabled = false --%<---- |
MirrorMaker Clusters
...
Code Block |
---|
clusters = primary, backup cluster.primary.brokerbootstrap.listservers = localhost:9091 cluster.backup.brokerbootstrap.listservers = localhost:9092 |
This can be run as follows:
...
Code Block |
---|
clusters = primary, backup cluster.primary.bootstrap.servers = localhost:9091 cluster.backup.bootstrap.servers = localhost:9092 connector.primary->backup.topics = .* connector.primary->backup.emit.heartbeats.enabled = false |
In this case, two MirrorSourceConnectors, two MirrorCheckpointConnectors, and two MirrorHeartbeatConnectors will be created by the driver and the primary→backup connectors will start replicating all source topics at launch, with heartbeats disabled. Sub-configurations like connector. primary->backup.x.y.z will be applied to all connectors in that source→target flow (MirrorSourceConnector, MirrorCheckpointConnector), avoiding the need to configure each connector individually.
...
property | default value | description |
clusters | required | comma-separated list of Kafka cluster "aliases" |
cluster.cluster.bootstrap.servers | required | connection information for the specific cluster |
cluster.cluster.x.y.z | n/a | passed to workers for a specific cluster |
connector.source->target.x.y.z | n/a | passed to a specific connector |
...
Code Block |
---|
# mm2.properties clusters = us-west, us-east cluster.us-west.bootstrap.servers = host1:9092 cluster.us-east.bootstrap.servers = host2:9092 |
...
You can also override default properties for specific clusters or connectors:
Code Block |
---|
cluster.us-west.offset.storage.topic = mm2-offsets connector.us-west->us-east.emit.heartbeats.enabled = false |
Second, launch one or more MirrorMaker cluster nodes:
...
- MirrorSourceConnector
- MirrorSinkConnector (coming soon)
- MirrorCheckpointConnector
- MirrorHeartbeatConnector
...
- Command-line tools wrapping RemoteClusterUtils. Document cross-cluster migration procedure.
- Broker and KafkaConsumer support for unambiguous remote topic names, e.g. with a reserved separator character.
- CheckpointSinkConnector/Task to support upstream checkpointing when a single Connect cluster is used.
- Additional metrics.
- Support reconfiguration of running Tasks without stop, start.
- Exactly-once semantics in Connect and MM2.Improve offset translation, e.g. with timestamps or RecordBatch headers, to get closer to exact offset of latest commit.
- Prevent duplicate streams from "diamond problem" (fan-out-fan-in).
- REST API for start, stop, status of underlying Connectors.
- Support multiple clusters/workers/herders in ConnectDistributed to obviate MM2-specific driver.KIP-416: Notify SourceTask of ACK'd offsets, metadata
Rejected Alternatives
We could release this as an independent project, but we feel that cluster replication should be one of Kafka's fundamental features.
We could deprecate MirrorMaker but keep it around. However, we see this as a natural evolution of MirrorMaker, not an alternative solution.
We could update MirrorMaker rather than completely rewrite it. However, we'd end up recreating many features provided by the Connect framework, including the REST API, configuration, metrics, worker coordination, etc.
We could build on Uber's uReplicator, which solves some of the problems with MirrorMaker. However, uReplicator uses Apache Helix to provide features that overlap with Connect, e.g. REST API, live configuration changes, cluster management, coordination etc. A native MirrorMaker should only use native parts of Apache Kafka. That said, uReplicator is a major inspiration for the MM2 design.
- We could provide a high-level REST API for remote-controlling a MirrorMaker cluster. However, this has overlapping concerns with Connect.
Instead of separate connectors for heartbeats and checkpoints, we could do everything in a single connector. However, this restricts how Converters and Transformations can be used. For example, you might want a replication pipeline that converts to JSON, but that doesn't necessarily mean you also want heartbeats to be JSON-encoded. It's possible to create a separate KafkaProducer within MirrorSourceConnector to emit Heartbeats and Checkpoints without going through the Connect transformation pipeline, but I don't want to introduce the additional configuration properties for a whole new producer.
...