...
MirrorSourceConnector, MirrorSinkConnector, MirrorSourceTask, MirrorSinkTask classes.
- MirrorCheckpointConnector, MirrorCheckpointTask.
- MirrorHeartbeatConnector, MirrorHeartbeatTask.
- MirrorConnectorConfig, MirrorTaskConfig classes.
ReplicationPolicy interface. DefaultReplicationPolicy and LegacyReplicationPolicy classes.
Heartbeat and checkpoint topics and associated schemas.
RemoteClusterUtils class for querying remote cluster reachability and lag, and for translating consumer offsets between clusters.
MirrorMaker driver class with main entry point for running MM2 cluster nodes.
- MirrorMakerConfig used by MirrorMaker driver.
- HeartbeatMessageFormatter, CheckpointMessageFormatter
- ./bin/connect-mirror-maker.sh and ./config/mirror-maker.properties sample configuration.
...
"active/active" replication
This is not possible with MirrorMaker today supported by legacy MirrorMaker out-of-the-box (without custom handlers) -- records would be replicated back and forth indefinitely, and the topics in either cluster would be merged inconsistently between clusters. The remote naming convention avoids these problems by keeping local and remote records in separate partitions. In either cluster above, the normal topic1 contains only locally produced records. Likewise, B.topic1 contains only records produced in cluster B.
...
Principals that can READ the source topic can READ the remote topic.
No one can WRITE to the remote topic except MM2.
Internal Topics
MM2 emits heartbeat topics like us-west.heartbeat, with time-stamped messagesemits a heartbeat topic in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible. Cycle detection prevents infinite recursion.
...
heartbeats are replicated between clusters
The schema for the heartbeat topic contains:
- cluster (String): cluster where the heartbeat was emitted
- connector (String): name of Connector which produced the heartbeat
- timestamp (long): epoch millis when heartbeat was created
Additionally, the connectors periodically emit checkpoints in the destination cluster, containing offsets for each Additionally, the connectors periodically emit checkpoints in the destination cluster, containing offsets for each consumer group in the source cluster. The connector will periodically query the source cluster for all committed offsets from all consumer groups, filter for those topics being replicated, and emit a message to a topic like us-west.checkpoints.internal in the destination cluster. The message will include the following fields:
source cluster alias (String)
consumer group IDid (String)
topic (String)
partition (int)
offset (int): latest committed offset in source cluster
latest committed offset translated to destination cluster's offsets
timestamp- metadata (String)
timestamp
A Checkpoint class exposes these via the following methods:
As with __consumer_offsets, the checkpoint topic is log-compacted to reflect only the latest offsets across consumer groups. The topic will be created automatically by the connector if it doesn't exist.
Remote Cluster Utils
A utility class RemoteClusterUtils will leverage the internal topics described above to assist in computing reachability, inter-cluster lag, and offset translation. It won't be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.
The interface is as follows (subject to change):
Code Block |
---|
// Calculates the number of hops between a client and an upstream Kafka cluster based on replication heartbeats.
// If the given cluster is not reachable, returns -1.
int replicationHops(AdminClient client, String upstreamClusterAlias)
// Find all heartbeat topics, e.g. A.B.heartbeat, visible from the client.
List<String> heartbeatTopics(AdminClient client)
// Find all checkpoint topics, e.g. A.checkpoint.internal, visible from the client.
List<String> checkpointTopics(AdminClient client)
// Find all upstream clusters reachable from the client
List<String> upstreamClusters(AdminClient client)
// Construct a consumer that is subscribed to all heartbeat topics.
KafkaConsumer<Heartbeat> heartbeatConsumer(Map<?, ?> consumerConfig)
// Construct a consumer that is subscribed to all checkpoint topics.
KafkaConsumer<Checkpoint> checkpointConsumer(Map<?, ?> consumerConfig)
// Construct a consumer that is subscribed to checkpoints from a specific upstream cluster.
KafkaConsumer<Checkpoint> checkpointConsumer(Map<?, ?> consumerConfig, String sourceClusterAlias)
// Construct a consumer that is subscribed to checkpoints from a specific upstream consumer group.
KafkaConsumer<Checkpoint> checkpointConsumer(Map<?, ?> consumerConfig, String sourceClusterAlias, String remoteGroupId)
// Find the local offsets corresponding to the latest checkpoint from a specific upstream consumer group.
Map<TopicPartition, Long> translateOffsets(Map<?, ?> targetConsumerConfig, String sourceClusterAlias, String targetClusterAlias, String remoteGroupId) |
Replication Policies
ReplicationPolicy classes specify the behavior of MM2 at a high level, including which topics to replicate. The interface is as follows:
The heartbeat and checkpoint records are encoded using Kafka's internal serdes, as with __consumer_offsets. A MessageFormatter will be supplied for use with ConsoleConsumer etc.
Relevant classes below.
Code Block |
---|
public class Checkpoint {
public static final String TOPIC_KEY = "topic";
public static final String PARTITION_KEY = "partition";
public static final String OFFSET_KEY = "offset";
public static final String METADATA_KEY = "metadata";
public static final String CONSUMER_GROUP_ID_KEY = "group";
public static final String CONSUMER_OFFSETS_KEY = "offsets";
public static final String SOURCE_CLUSTER_ALIAS_KEY = "cluster";
public static final String TIMESTAMP_KEY = "timestamp";
public static final Schema SCHEMA = new Schema(
--->%---
public String sourceClusterAlias() ...
public String consumerGroupId() ...
public TopicPartition sourceTopicPartition() ...
public OffsetAndMetadata sourceOffsetAndMetadata() ...
}
--->%---
public class Heartbeat {
--->%---
public static final Schema SCHEMA = new Schema(
--->%---
public String sourceClusterAlias() ...
public String connectorName() ...
public long timestamp() ...
} |
Remote Cluster Utils
A utility class RemoteClusterUtils will leverage the internal topics described above to assist in computing reachability, inter-cluster lag, and offset translation. It won't be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.
The interface is as follows (subject to change):
Code Block |
---|
// Calculates the number of hops between a client and an upstream Kafka cluster based on replication heartbeats.
// If the given cluster is not reachable, returns -1.
int replicationHops(AdminClient client, String upstreamClusterAlias)
// Find all heartbeat topics, e.g. A.B.heartbeat, visible from the client.
List<String> heartbeatTopics(AdminClient client)
// Find all checkpoint topics, e.g. A.checkpoint.internal, visible from the client.
List<String> checkpointTopics(AdminClient client)
// Find all upstream clusters reachable from the client
List<String> upstreamClusters(AdminClient client)
// Construct a consumer that is subscribed to all heartbeat topics.
KafkaConsumer<Heartbeat> heartbeatConsumer(Map<?, ?> consumerConfig)
// Construct a consumer that is subscribed to all checkpoint topics.
KafkaConsumer<Checkpoint> checkpointConsumer(Map<?, ?> consumerConfig)
// Find the local offsets corresponding to the latest checkpoint from a specific upstream consumer group.
Map<TopicPartition, Long> translateOffsets(Map<?, ?> targetConsumerConfig, String sourceClusterAlias, String targetClusterAlias, String remoteGroupId) |
Replication Policies
ReplicationPolicy classes specify the behavior of MM2 at a high level, including which topics to replicate. The interface is as follows:
Code Block |
---|
public interface ReplicationPolicy {
|
Code Block |
public interface ReplicationPolicy {
boolean shouldReplicateTopic(String topic);
boolean shouldCheckpointTopic(String topic);
boolean shouldReplicateConsumerGroup(String groupId);
boolean shouldSyncTopicConfiguration(String topic);
boolean shouldReplicatePrincipalACL(String principal);
boolean shouldSyncTopicACL(String topic);
String formatRemoteTopic(String topic); // e.g. us-west.topic1
} |
...
property | default value | description |
name | required | name of the connector, e.g. "us-west->us-east" |
topics | empty string | regex of topics to replicate, e.g. "topic1|topic2|topic3" |
topics.blacklist | [".*\.internal", ".*\.replica", "__consumer_offsets", …] | topics to exclude from replication |
groups | empty string | regex of groups to replicate, e.g. ".*" |
source.cluster.alias | required | name of the cluster being replicated |
target.cluster.alias | required | name of the downstream Kafka cluster |
source.cluster.bootstrap.servers | required | upstream cluster to replicate |
target.cluster.bootstrap.servers | required | downstream cluster |
sync.topic.configs | true | whether or not to monitor source cluster for configuration changes |
sync.topic.acls | true | whether to monitor source cluster ACLs for changes |
admin.* | n/a | passed to AdminClient for local cluster |
remote.admin.* | n/a | passed to remote AdminClient |
rename.topics | true | connector should rename downstream topics according to "remote topic" convention |
emit.heartbeats | true | connector should periodically emit heartbeats |
emit.heartbeats.interval.seconds | 5 (seconds) | frequency of heartbeats |
emit.checkpoints | true | connector should periodically emit consumer offset information |
emit.checkpoints.interval.seconds | 5 (seconds) | frequency of checkpoints |
refresh.topics | true | connector should periodically check for new topics |
refresh.topics.interval.seconds | 5 (seconds) | frequency to check source cluster for new topics |
refresh.groups | true | connector should periodically check for new consumer groups |
refresh.groups.interval.seconds | 5 (seconds) | frequency to check source cluster for new consumer groups |
readahead.queue.capacity | 500 (records) | number of records to let consumer get ahead of producer |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | use LegacyReplicationPolicy to mimic legacy MirrorMaker |
heartbeat.retention.ms | 1 day | used when creating heartbeat topics for the first time |
checkpoint.retention.ms | 1 day | used when creating checkpoint topics for the first time |
The following are the SourceConnector's additional configuration properties.
...
- Phase 1 (targeting next Apache Kafka release): All MirrorMaker 2.0 Java code is .0 Java code is added to ./connect/mirror/.
- Phase 2 (subsequent release): Legacy MirrorMaker Scala code is deprecated, but kept in place. Sample MM2 scripts and configuration files are added to ./connect/mirrorbin/ and ./config/.
- Phase 2 3 (subsequent release): Legacy MirrorMaker Scala code is deprecated, but kept in place. Sample MM2 scripts and configuration files are added to ./bin/ and ./config/.
- Phase 3 (subsequent release): Legacy MirrorMaker Scala code is removed from Apache Kafka. A new ./bin/kafka-mirror-maker.sh script is provided which replaces and emulates the legacy script.
Rejected Alternatives
- is removed from Apache Kafka. A new ./bin/kafka-mirror-maker.sh script is provided which replaces and emulates the legacy script.
Rejected Alternatives
We could release this as an independent project, but we feel that cluster replication should be one of Kafka's fundamental features.
We could deprecate MirrorMaker but keep it around. However, we see this as a natural evolution of MirrorMaker, not an alternative solution.
We could update MirrorMaker rather than completely rewrite it. However, we'd end up recreating many features provided by the Connect framework, including the REST API, configuration, metrics, worker coordination, etc.
We could build on Uber's uReplicator, which solves some of the problems with MirrorMaker. However, uReplicator uses Apache Helix to provide features that overlap with Connect, e.g. REST API, live configuration changes, cluster management, coordination etc. A native MirrorMaker should only use native parts of Apache Kafka. That said, uReplicator is a major inspiration for the MM2 design.
- We could provide a high-level REST API for remote-controlling a MirrorMaker cluster. However, this has overlapping concerns with Connect.
Instead of separate connectors for heartbeats and checkpoints, we could do everything in a single connector. However, this restricts how Converters and Transformations can be used. For example, you might want a replication pipeline that converts to JSON, but that doesn't necessarily mean you also want heartbeats to be JSON-encoded. It's possible to create a separate KafkaProducer within MirrorSourceConnector to emit Heartbeats and Checkpoints without going through the Connect transformation pipeline, but I don't want to introduce the additional configuration properties for a whole new producer
We could release this as an independent project, but we feel that cluster replication should be one of Kafka's fundamental features.
We could deprecate MirrorMaker but keep it around. However, we see this as a natural evolution of MirrorMaker, not an alternative solution.
We could update MirrorMaker rather than completely rewrite it. However, we'd end up recreating many features provided by the Connect framework, including the REST API, configuration, metrics, worker coordination, etc.
We could build on Uber's uReplicator, which solves some of the problems with MirrorMaker. However, uReplicator uses Apache Helix to provide features that overlap with Connect, e.g. REST API, live configuration changes, cluster management, coordination etc. A native MirrorMaker should only use native parts of Apache Kafka. That said, uReplicator is a major inspiration for the MM2 design.
- We could provide a high-level REST API for remote-controlling a MirrorMaker cluster. However, this has overlapping concerns with Connect
.