Note that this proposal is incomplete, and tries to explore the UX of the feature before establishing the technical requirements and limitations. As we discover what the technical limitations are, some of the UX may need to change, and some semantics of the feature may need to be softened.
To Co-Authors: None of the below contents is final or necessarily correct. Please feel free to edit this document directly and summarize your changes on the mailing list afterwards.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Since before the implementation of MirrorMaker 1, there has been a desire to replicate data between Kafka clusters. This can be done for a many different reasons including but not limited to: disaster recovery, latency optimization, load shaping, security, and data protection. Currently the open source tooling for Kafka replication consists of MirrorMaker 1 and MirrorMaker 2, which both fall short in many modern use-cases.
- They run in external processes which may experience outages when both Kafka clusters are otherwise healthy and accepting clients
- They represent an additional operational burden beyond just running Kafka
- They provide logical replication, in which the offset of a record in the source and target are given different offsets
- The replication is asynchronous, preventing the extension of strong consistency guarantees between Kafka clusters
Goals
- Replicate topics and other associated resources between intentionally separate Kafka clusters
- Provide physical replication semantics
- Preserve exactly-once-semantics (immediately, in some configurations, or as a later extension of the feature?)
Public Interfaces
User Interface
- New AdminClient methods for managing Replication Links on both the source and destination clusters
- Links accept a configuration, including topics.regex and consumer.groups.regex
- Existing Consumers & Producers can access replicated topics without an upgrade
- Metrics allow observation of the state of each partition included in the replication link and progress of the replication flow (lag, throughput, etc)
- Links can be temporarily or permanently disconnected. During temporary disconnects, the replicated topic is not writable. After a permanent disconnect, the replicated topic is writable.
Data Semantics
Cross-Cluster Replication is similar to Intra-Cluster replication, as both cross-cluster topics and intra-cluster replicas:
- Have the same configuration as their source
- Have the same offsets for records
- Have the same number of partitions
They are different in the following ways, as cross-cluster replicas:
- Are subject to the target cluster's ACL environment
- Are not included in acks=all or min-ISR requirements (optional? maybe acks=all is useful/required for EOS producers that want to make sure cross-cluster replication finishes)
- Are not eligible for fetch-from-follower on the source cluster
- Have a separate topic-id
Consumer Offsets
- Consumer offsets should be replicated to the target cluster synchronously, such that the offset arrives before the commit record (does this require 2PC? can this be configured in the replication link?)
- An exactly-once application which fails-over from source to target clusters should maintain it's exactly-once semantics.
Remote and Local Logs
- Remote log replication should be controlled by the second tier's provider.
- Remote logs can be referenced if not replicated by the second tier's provider so that replicated Kafka topics reference the source remote log storage.
Networking
- The network path between Kafka clusters is assumed to be less reliable than the intra-cluster network, and have more strict constraints
- Network connections for replication links can proceed either source→target or target→source to allow one cluster to be behind a NAT (can we support NAT punching as a first-class feature, to allow both clusters to be behind NAT?)
- Allow for simple traffic separation of cross-cluster connections and client/intra-cluster connections (do we need to connect to something other than the other cluster's advertised listeners?)
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- Both the source and target clusters should have a version which includes the Cross-Cluster Replication feature
- Clusters which support the Cross-Cluster Replication feature should negotiate on the mutually-supported replication semantics
- If one of the clusters is downgraded to a version which does not support Cross-Cluster Replication, the partner cluster should temporarily disable replication.
Test Plan
Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
Propose improvements to MirrorMaker 2 or a new MirrorMaker 3
Mirror Maker's approach to using public clients to perform replication limits the guarantees that replication provides. In order to strengthen these guarantees, we would need to add capabilities to the public clients, or rely on internal interfaces, neither of which is desirable.
Establish mechanisms for improving "stretched clusters" that have a heterogeneous network between nodes, aka "rack awareness"
The use-case for a stretched cluster is different than cross-cluster replication, in that a stretched cluster shares ACLs, topic-ids, principals, secrets, etc. Cross-Cluster Replication is intended to be used across data protection domains, which currently require the use of distinct clusters.
Propose a layer above Kafka to provide virtual/transparent replication
This is currently possible to implement with the Kafka public APIs, but doesn't actually replicate the data. This makes it unsuitable for disaster recovery, latency optimization, and load-shaping use-cases where connectivity to the source topic/replicas may be lost.