Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-7500
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
MirrorMaker has been used for years in large-scale production environments, but not without several problems:
Topics are created with default configuration. Often they'll need to be repartitioned manually.
ACL and configuration changes are not synced across mirrored clusters, making it difficult to manage multiple clusters.
Records are repartitioned with DefaultPartitioner. Semantic partitioning may be lost.
Any configuration change means the cluster must be bounced. This includes adding new topics to the whitelist, which may be a frequent operation.
There's no mechanism to migrate producers or consumers between mirrored clusters.
There's no support for exactly-once delivery. Records may be duplicated during replication.
Clusters can't be made mirrors of each other, i.e. no support for active/active pairs.
For these reasons, MirrorMaker is insufficient for many use cases, including backup, disaster recovery, and failover scenarios. Several other Kafka replication tools have been created to address some of these limitations, but Apache Kafka has no adequate replication strategy to date. Moreover, the lack of a native solution makes it difficult to build generic tooling for multi-cluster environments.
I propose to replace MirrorMaker with a new multi-cluster, cross-datacenter replication engine based on the Connect framework, MirrorMaker 2.0 (MM2). The new engine will be fundamentally different from the legacy MirrorMaker in many ways, but will provide a drop-in replacement for existing deployments.
Highlights of the design include:
Leverages the Connect framework and ecosystem.
Includes both source and sink connectors.
Detects new topics, partitions.
Automatically syncs topic configuration between clusters.
Manages downstream topic ACL.
Supports "active/active" cluster pairs, as well as any number of active clusters.
Supports cross-datacenter replication, aggregation, and other complex topologies.
Provides new metrics including end-to-end replication latency across multiple data centers/clusters.
Emits offsets required to migrate consumers between clusters.
Tooling for offset translation.
MirrorMaker-compatible legacy mode.
Public Interfaces
New interfaces include:
KafkaSourceConnector, KafkaSinkConnector, KafkaSourceTask, KafkaSinkTask classes.
New configuration properties in addition to those supported by legacy MirrorMaker.
Heartbeat and checkpoint topics and associated schemas; however, these will be considered internal.
RemoteClusterUtils class for querying remote cluster reachability and lag, and for translating consumer offsets between clusters.
Scripts for launching MM2 workers, including support for a legacy mode compatible with MirrorMaker.
Proposed Changes
Remote Topics, Partitions
This design introduces the concept of remote topics, which are replicated topics referencing a source cluster via a naming convention, e.g. "us-west.topic1", where topic1 is a source topic and us-west is a source cluster alias. Any partitions in a remote topic are remote partitions and refer to the same partitions in the source topic. Specifically:
Partitioning and order of records is preserved between source and remote topics.
Remote topics must have the same number of partitions as their source topics.
A remote topic has a single source topic.
A remote partition has a single source partition.
Remote partition i is a replica of source partition i.
MM2 replicates topics from a source cluster to corresponding remote topics in the destination cluster. In this way, replication never results in merged or out-of-order partitions. Moreover, the topic renaming policy enables "active/active" replication by default:
"active/active" replication
This is not possible with MirrorMaker today -- records would be replicated back and forth indefinitely, and the topics in either cluster would be merged inconsistently between clusters. The remote naming convention avoids these problems by keeping local and remote records in separate partitions. In either cluster above, the normal topic1 contains only locally produced records. Likewise, B.topic1 contains only records produced in cluster B.
To consume local records only, a consumer subscribes to topic1 as normal. To consume records that have been replicated from a remote cluster, consumers subscribe to a remote topic, e.g. B.topic1.
This convention extends to any number of clusters.
Aggregation
Downstream consumers can aggregate across multiple clusters by subscribing to the local topic and all corresponding remote topics, e.g. topic1, us-west.topic1, us-east.topic1… or by using a regex subscription.
Topics are never merged or repartitioned within the connector. Instead, merging is left to the consumer. This is in contrast to MirrorMaker, which is often used to merge topics from multiple clusters into a single aggregate cluster.
Likewise, a consumer can always elect not to aggregate by simply subscribing to the local topic only, e.g. topic1.
This approach eliminates the need for special purpose aggregation clusters without losing any power or flexibility.
Cycle detection
It is possible to configure two clusters to replicate each other ("active/active"), in which case all records produced to either cluster can be seen by consumers in both clusters. To prevent infinite recursion, topics that already contain "us-west" in the prefix won't be replicated to the us-west cluster.
This rule applies across all topics regardless of topology. A cluster can be replicated to many downstream clusters, which themselves can be replicated, yielding topics like us-west.us-east.topic1 and so on. The same cluster alias will not appear twice in a topic name due to cycle detection.
In this way, any topology of clusters is supported, not just DAGs.
Config, ACL Sync
MM2 monitors source topics and propagate configuration changes to remote topics. Any missing partitions will be automatically created.
Remote topics should never be written to, except by the source or sink connector. Producers must only write to local topics (e.g. topic1), which may then be replicated elsewhere. To enforce this policy, MM2 propagates ACL policies to downstream topics using the following rules:
Principals that can READ the source topic can READ the remote topic.
No one can WRITE to the remote topic except MM2.
Connectors
Both source and sink connectors will be implemented to enable complex flows between multiple Kafka clusters and across data centers. Generally, each data center will have a single MM2 cluster and a primary Kafka cluster, K.
KafkaSourceConnector workers replicate a set of topics from a single source cluster into the primary cluster.
KafkaSinkConnector workers consume from the primary cluster and replicate topics to a single sink cluster.
Both connectors contain a pair of producers and consumers to replicate records, and a pair of AdminClients to propagate configuration/ACL changes.
KafkaSourceConnector and workers
By combining SourceConnectors and SinkConnectors, a single MM2 cluster can manage replication across multiple Kafka clusters.
Connectors (C) replicate data between Kafka clusters.
For cross-datacenter replication (XDCR), each datacenter should have a single MM2 cluster which pulls records from the other data centers via source connectors. Replication may fan-out within each datacenter via sink connectors.
cross-datacenter replication of multiple clusters
Connectors can be configured to replicate specific topics via a whitelist or regex. For example, clusters A1 and B1 above might contain only a subset of topics from K1, K2, or K3.
Internal Topics
The connectors emit a heartbeat topic like us-west.heartbeat, with time-stamped messages. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible.
heartbeats are replicated between clusters
Additionally, the connectors periodically emit checkpoints in the destination cluster, containing offsets for each consumer group in the source cluster. The connector will periodically query the source cluster for all committed offsets from all consumer groups, filter for those topics being replicated, and emit a message to a topic like us-west.checkpoints.internal in the destination cluster. The message will include the following fields:
consumer group ID
topic
partition
latest committed offset in source cluster
latest committed offset translated to destination cluster's offsets
As with __consumer_offsets, the checkpoint topic is log-compacted to reflect only the latest offsets across consumer groups. The topic will be created automatically by the connector if it doesn't exist.
A utility class (RemoteClusterUtils) and command-line tool will be provided to assist in translation of offsets between clusters using these checkpoints. It won't be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.
Configuration Properties
Properties common to the SourceConnector and SinkConnector:
property | default value | description |
name | n/a | name of the connector, e.g. "us-west-mirror-maker" |
topics | ".*" | regex of topics to replicate, e.g. "topic1|topic2|topic3" |
topics.blacklist | [".*\.internal", ".*\.replica", "__consumer_offsets", …] | topics to exclude from replication |
source.cluster.alias | n/a | name of the cluster being replicated |
sync.topic.configs | true | whether or not to monitor source cluster for configuration changes |
sync.topic.acls | true | whether to monitor source cluster ACLs for changes |
admin.* | n/a | passed to AdminClient for local cluster |
remote.admin.* | n/a | passed to remote AdminClient |
emit.heartbeats | true | connector should periodically emit heartbeats |
emit.heartbeats.interval.seconds | 5 (seconds) | frequency of heartbeats |
emit.checkpoints | true | connector should periodically emit consumer offset information |
emit.checkpoints.interval.seconds | 5 (seconds) | frequency of checkpoints |
update.topics | true | connector should periodically check for new topics |
update.topics.interval.seconds | 5 (seconds) | frequency to check source cluster for new topics |
update.groups | true | connector should periodically check for new consumer groups |
update.groups.interval.seconds | 5 (seconds) | frequency to check source cluster for new consumer groups |
readahead.queue.capacity | 500 (records) | number of records to let consumer get ahead of producer |
The following are the SourceConnector's supported configuration properties.
property | default value | description |
consumer.* | n/a | passed to the consumer |
consumer.group.id | something like "mm2-source-us-west" | passed to the consumer |
consumer.poll.timeout.ms | 500 (ms) | poll timeout |
And similarly for the SinkConnector:
property | default value | description |
producer.* | n/a | passed to the producer |
Depending on use-case, the connectors may be configured with a simple message transform (SMT) to prevent topic renaming. This is useful to emulate legacy MirrorMaker behavior.
Example Configuration
For demo purposes, a file ./config/connect-mirror-source.properties will be created as follows:
name=local-mirror-source topics=.* source.cluster.alias=upstream connector.class=org.apache.kafka.connect.mirror.KafkaSourceConnector tasks.max=1 # for demo, use local Kafka cluster for all producers, consumers bootstrap.servers=localhost:9092 # use ByteArrayConverter to ensure that records are not re-encoded key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
This will enable running standalone MM2 nodes as follows:
$ ./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-mirror-source.properties
Exactly-once Delivery
This proposal assumes that support for exactly-once delivery will eventually be supported in Connect for SourceConnectors. Until then, our SourceConnector cannot guarantee exactly-once delivery and may introduce duplicate records. We can already support exactly-once in the SinkConnector.
Compatibility, Deprecation, and Migration Plan
A new version of ./bin/kafka-mirror-maker.sh will be implemented to run MM2 in "legacy mode", i.e. with new features disabled and supporting existing options and configuration properties. Existing deployments will be able to upgrade to MM2 seamlessly.
The existing MirrorMaker source will be removed from Kafka core project. MM2 will be added to the connect project.
Rejected Alternatives
We could skip the SinkConnector. Technically it's possible to have one MM2 cluster per Kafka cluster, and then configure them all to replicate each other via SourceConnectors only. But we also want to support scenarios where a bunch of different Kafka clusters in the same data center are managed by a single MM2 cluster through a single pane of glass. Having a single MM2 cluster per data center is much easier to manage.
We could release this as an independent project, but we feel that cluster replication should be one of Kafka's fundamental features.
We could deprecate MirrorMaker but keep it around. However, we see this as a natural evolution of MirrorMaker, not an alternative solution.
We could update MirrorMaker rather than completely rewrite it. However, we'd end up recreating many features provided by the Connect framework, including the REST API, configuration, metrics, worker coordination, etc.
We could build on Uber's uReplicator, which solves some of the problems with MirrorMaker. However, uReplicator uses Apache Helix to provide features that overlap with Connect, e.g. REST API, live configuration changes, cluster management, coordination etc. A native MirrorMaker should only use native parts of Apache Kafka. That said, uReplicator is a major inspiration for the MM2 design.