Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • MirrorSourceConnector, MirrorSinkConnector, MirrorSourceTask, MirrorSinkTask classes.

  • MirrorCheckpointConnector, MirrorCheckpointTask.
  • MirrorHeartbeatConnector, MirrorHeartbeatTask.
  • MirrorConnectorConfig, MirrorTaskConfig classes.
  • ReplicationPolicy interface. DefaultReplicationPolicy and LegacyReplicationPolicy classes.

  • Heartbeat and , checkpoint, offset sync topics and associated schemas.

  • RemoteClusterUtils class  and MirrorClient classes for querying remote cluster reachability and lag, and for translating consumer offsets between clusters.

  • MirrorMaker driver class with main entry point for running MM2 cluster nodes.

  • MirrorMakerConfig used by MirrorMaker driver.
  • HeartbeatMessageFormatter, CheckpointMessageFormatter
  • ./bin/connect-mirror-maker.sh and ./config/mirror-maker.properties sample configuration.

New metrics include:

  • replication-latency-ms(-avg/-min/-max): timespan between each record's timestamp and downstream ACK
  • record-bytes(-avg/-min/-max): size of each record being replicated
  • record-age-ms(-avg/-min/-max): age of each record when consumed
  • record-count: number of records replicated
  • checkpoint-latency-ms(-avg/-min/-max): timestamp between consumer group commit and downstream checkpoint ACK
  • heartbeat-ms: timestamp of latest heartbeat creation time
  • heartbeat-age-ms: timespan between now and latest heartbeat creation time
  • heartbeat-lag-ms: timespan between latest heartbeat's upstream creation time and downstream ACK
  • replication-lag-ms: timespan between latest record's consumed-at time and downstream ACK
  • replication-lag-ms-p99, -p95, -p90, -p50

 The mbean name for these metrics will be: kafka.mirror.connect:type=mirror-maker-metrics,sourceMirrorSourceConnect,target=([.\w]+),targettopic=([.\w]+),partition=([.\d]+) and kafka.mirror.connect:type=MirrorCheckpointConnector,target=([.\w]+),source=([.\w]+),group=([.\w]+)

Proposed Changes

Remote Topics, Partitions

...

The schema for the heartbeat topic contains:

  • target cluster (String): cluster where the heartbeat was emittedconnector
  • source cluster (String): name of Connector which produced the heartbeatsource cluster of connector doing the emitting
  • timestamp (long): epoch millis when heartbeat was created

Additionally, the connectors periodically emit checkpoints in the destination cluster, containing offsets for each consumer group in the source cluster. The connector will periodically query the source cluster for all committed offsets from all consumer groups, filter for those topics being replicated, and emit a message to a topic like us-west.checkpoints.internal in the destination cluster. The message will include the following fields:

...

  • consumer group id (String)

  • topic (String) – includes source cluster prefix

  • partition (int)

  • upstream offset (int): latest committed offset in source cluster

  • downstream offset (int): latest committed offset translated to target cluster
  • metadata (metadata (String)
  • timestamp

...

As with  __consumer_offsets, the checkpoint topic is log-compacted to reflect only the latest offsets across consumer groups, based on a topic-partition-group composite key. The topic will be created automatically by the connector if it doesn't exist.

The heartbeat and checkpoint records are encoded using Kafka's internal serdes, as with __consumer_offsets. A MessageFormatter will be supplied for use with ConsoleConsumer etc.

Relevant classes below.

Finally, an offset sync topic encodes cluster-to-cluster offset mappings for each topic-partition being replicated.

  • topic (String): remote topic name
  • partition (int)
  • upstream offset (int): an offset in the source cluster
  • downstream offset (int): an equivalent offset in the target cluster

The heartbeat, checkpoint, and offset sync records are encoded using Kafka's internal serdes, as with __consumer_offsets. Relevant classes below.

Code Block
public class Checkpoint {
    public static final String TOPIC_KEY = "topic";
    public static final String PARTITION_KEY = "partition";
    public static final String OFFSETCONSUMER_GROUP_ID_KEY = "offsetgroup";
    public static final String METADATAUPSTREAM_OFFSET_KEY = "metadataupstreamOffset";
    public static final String CONSUMERDOWNSTREAM_GROUP_IDOFFSET_KEY = "groupoffset";
    public static final String CONSUMERMETADATA_OFFSETS_KEY = "offsetsmetadata";

--->%---

    public static final String SOURCE_CLUSTER_ALIAS_KEY = "cluster";Checkpoint(String consumerGroupId, TopicPartition topicPartition, long upstreamOffset,
    public static final String TIMESTAMP_KEY = "timestamp";
  long  public static final Schema SCHEMA = new Schema(
--->%---downstreamOffset, String metadata) ...

    public String consumerGroupId() ...

    public StringTopicPartition sourceClusterAliastopicPartition() ...

    public Stringlong consumerGroupIdupstreamOffset() ...

    public TopicPartitionlong sourceTopicPartitiondownstreamOffset() ...

    public OffsetAndMetadataString sourceOffsetAndMetadatametadata() ...
}

    public OffsetAndMetadata offsetAndMetadata() ...

--->%---

public class Heartbeat {
--->%---
    public static final Schema SCHEMA = new Schema(
--->%---String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
    public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";
    public static final String sourceClusterAlias(TIMESTAMP_KEY = "timestamp";

--->%---

    public Heartbeat(String sourceClusterAlias, String targetClusterAlias, long timestamp) ...

    public String connectorNamesourceClusterAlias() ...

    public longString timestamptargetClusterAlias() ...
}

Remote Cluster Utils

A utility class RemoteClusterUtils will leverage the internal topics described above to assist in computing reachability, inter-cluster lag, and offset translation. It won't be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.


    public long timestamp() ...

--->%---

public class OffsetSync {
    public static final String TOPIC_KEY = "topic";
    public static final String PARTITION_KEY = "partition";
    public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
    public static final String DOWNSTREAM_OFFSET_KEY = "offset";

--->%---

    public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) ...

    public TopicPartition topicPartition() ...

    public long upstreamOffset() ...

    public long downstreamOffset() ...

Remote Cluster Utils

A utility class RemoteClusterUtils will leverage the internal topics described above to assist in computing reachability, inter-cluster lag, and offset translation. It won't be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.

The interface is as follows:

Code Block
// Calculates the number of hops between a client and an upstream Kafka cluster based on replication heartbeats.
// If the given cluster is not reachable, returns -1.
int replicationHops(Map<String, Object> properties, String upstreamClusterAlias)

// Find all heartbeat topics, e.g. A.B.heartbeat, visible from the client.
Set<String> heartbeatTopics(Map<String, Object> properties)

// Find all checkpoint topics, e.g. A.checkpoint.internal, visible from the client.
Set<String> checkpointTopics(Map<String, Object> properties)

// Find all upstream clusters reachable from the client
Set<String> upstreamClusters(Map<String, Object> properties)

// Construct a consumer that is subscribed to all heartbeat topics.
Set<String> heartbeatTopics(Map<String, Object> properties)

// Find the local offsets corresponding to the latest checkpoint from a specific upstream consumer group.
Map<TopicPartition, OffsetAndMetadata> translateOffsets(Map<String, Object> properties,
            String targetClusterAlias, String consumerGroupId, Duration timeout)


The utility class assumes DefaultReplicationPolicy is used for replication.

Replication Policies and Filters

A ReplicationPolicy defines what a "remote topic" is and how to interpret it. This should generally be consistent across an organization. The interface is as follows:

Code Block
//** CalculatesDefines thewhich numbertopics ofare hops between a client and an upstream Kafka cluster based on replication heartbeats.
// If the given cluster is not reachable, returns -1.
int replicationHops(AdminClient client, String upstreamClusterAlias)

// Find all heartbeat topics"remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {

    /** How to rename remote topics; generally should be like us-west.topic1. */
    String formatRemoteTopic(String sourceClusterAlias, String topic);

    /** Source cluster alias of given remote topic, e.g. A.B.heartbeat, visible from the client.
List<String> heartbeatTopics(AdminClient client)

// Find all checkpoint topics, e.g. A.checkpoint.internal, visible from the client.
List<String> checkpointTopics(AdminClient client)

// Find all upstream clusters reachable from the client
List<String> upstreamClusters(AdminClient client)

// Construct a consumer that is subscribed to all heartbeat topics.
KafkaConsumer<Heartbeat> heartbeatConsumer(Map<?, ?> consumerConfig)

// Construct a consumer that is subscribed to all checkpoint topics.
KafkaConsumer<Checkpoint> checkpointConsumer(Map<?, ?> consumerConfig)

// Find the local offsets corresponding to the latest checkpoint from a specific upstream consumer group.
Map<TopicPartition, Long> translateOffsets(Map<?, ?> targetConsumerConfig, String sourceClusterAlias, String targetClusterAlias, String remoteGroupId)

...

Replication Policies

ReplicationPolicy classes specify the behavior of MM2 at a high level, including which topics to replicate. The interface is as follows:

Code Block
public interface ReplicationPolicy {


    // Whether to replicate the given topic.
	boolean shouldReplicateTopic(String topic);


    // Whether checkpoints should be emitted for the given"us-west" for "us-west.topic1".
        Returns null if not a remote topic.
    */
    String topicSource(String topic);

    /** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
        Topics may be replicated multiple hops, so the immediately upstream topic
        may itself be a remote topic.
        Returns null if not a remote topic.
    */
    String upstreamTopic(String topic);

    /** The name of the original source-topic, which may have been replicated multiple hops.
        Returns the topic if it is not a remote topic.
    *//
 If false, migrated consumers will lose offsets for this topic.
	boolean shouldCheckpointTopicString originalTopic(String topic);


    //** Whether checkpoints should be emitted for the given consumer group.Internal topics are never replicated. */
    // If false, the consumer group will lose all offsets when migrated.
	boolean shouldReplicateConsumerGroup(String groupId);


    // Whether remote topic configurations should be updated to match this source topic's configuration.
	boolean shouldSyncTopicConfigurationboolean isInternalTopic(String topic);
}

The DefaultReplicationPolicy specifies the <source>.<topic> convention described throughout this document. LegacyReplicationPolicy causes MM2 to mimic legacy MirrorMaker behavior.

Additionally, several Filters control the behavior of MM2 at a high level, including which topics to replicate. The DefaultTopicFilter, DefaultGroupFilter, and DefaultConfigPropertyFilter classes honor whitelists and blacklists via regex patterns. 

Code Block
/** Defines which topics should be replicated. */
public interface TopicFilter {

    boolean shouldReplicateTopic(String topic);
}

/** Defines which consumer groups //should Whetherbe remote topic ACLs should be updated with this principal.
	boolean shouldReplicatePrincipalACL(String principal);


    // Whether remote topic ACLsreplicated. */
public interface GroupFilter {

    boolean shouldReplicateGroup(String group);
}

/** Defines which topic configuration properties should be updated to match this source topic's ACL.
	boolean shouldSyncTopicACL(String topic);


replicated. */
public interface ConfigPropertyFilter {

    // e.g. us-west.topic1
	String formatRemoteTopicboolean shouldReplicateConfigProperty(String topicprop);
} 

The DefaultReplicationPolicy honors whitelists and blacklists for topics and consumer groups via regex patterns. LegacyReplicationPolicy causes MM2 to mimic legacy MirrorMaker behavior.




Connectors

Both source and sink connectors are provided to enable complex flows between multiple Kafka clusters and across data centers via existing Kafka Connect clusters. Generally, each data center has a single Connect cluster and a primary Kafka cluster, K.

...

Code Block
name = MirrorSourceConnector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
source.cluster.alias = primary
target.cluster.alias = backup
source.cluster.brokerbootstrap.listservers = localhost:9091
target.cluster.brokerbootstrap.listservers = localhost:9092
key.converter.class = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter.class = org.apache.kafka.connect.converters.ByteArrayConverter 

...

Code Block
clusters = primary, backup
cluster.primary.brokerbootstrap.listservers = localhost:9091
cluster.backup.brokerbootstrap.listservers = localhost:9092 
connector.primary->backup.topics = .*
connector.primary->backup.emit.heartbeats = false

...

property

default value

description

clusters

required

comma-separated list of Kafka cluster "aliases"


cluster.cluster.brokerbootstrap.listservers

required

connection information for the specific cluster


cluster.cluster.x.y.zn/apassed to workers for a specific cluster
connector.source->target.x.y.zn/apassed to a specific connector

...

Code Block
# mm2.properties
clusters = us-west, us-east
cluster.us-west.brokersbootstrap.listservers = host1:9092
cluster.us-east.brokersbootstrap.listservers = host2:9092

Optionally, you can override default MirrorMaker properties:

...

Code Block
PUT /connectors/us-west-source/config HTTP/1.1

{
    "name": "us-west-source",
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "us-west",
    "target.cluster.alias": "us-east",
    "source.cluster.brokersbootstrap.listservers": "us-west-host1:9091",
    "topics": ".*"
}

...