Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add group reservations, unclean link recovery, and non-goals.

...

  • Replicate topics and other associated resources between intentionally separate Kafka clusters
  • Offsets for replicated records should be the same as in the origin Kafka
  • Preserve exactly-once-semantics for replicated records

Non-Goals

  • Implement multi-leader partitions that accept writes on multiple brokers.
  • Perform distributed consensus to elect partition leaders across multiple clusters
  • Provide transparent failover of single clients between Kafka clusters

Public Interfaces

User Interface

  • New AdminClient methods for managing Replication Links on both the source and destination clusters
  • A single cluster can participate in arbitrarily many links, and both a source and destination simultaneously.
  • Links accept a configuration, including topics.regex and consumer.groups.regex, and a mode selector that accepts either Asynchronous or Synchronous
    • Can a link change it's topics.regex or consumer.groups.regex?  What happens if a topic or group is created or deleted while the link is in-sync? Does the link change state?
    • Can a link change from synchronous to asynchronous?
  • Existing Consumers & Producers can access replicated topics without an upgrade or reconfiguration.
  • Metrics allow observation of the state of each partition included in the replication link and progress of the replication flow (lag, throughput, etc)
  • During and shortly after network partitions, the link will be out-of-sync while it catches up with the source ISR
  • Links can be manually disconnected, after which the destination topic becomes writable.
    • Can we manually reverse a link while keeping consistency?
    • Can we reconnect a link, truncating the target if it has diverged?

Data Semantics

Relation to Intra-Cluster replication

Cross-Cluster Replication is similar to Intra-Cluster replication, as both cross-cluster topics and intra-cluster replicas:

...

  • Are subject to the target cluster's ACL environment
  • Are not eligible for fetches from source cluster consumers
  • Have a separate topic-id

Replication Mode

  • Asynchronous mode allows replication of a record to proceed after the source cluster has ack'd the record to the producer.
    • Maintains existing latency of source producers, but provides only single-topic consistency guarantees
  • Synchronous mode requires the source cluster to delay acks (or delay commits?) for a producer until after the record has been replicated to all ISRs for all attached synchronous replication links.
    • Also applies to consumer group offsets submitted by the consumer or transactional producer 
    • Increases latency of source producers, in exchange for multi-topic & consumer-offset consistency guarantees

Network Partition Behavior

We must support network partition tolerance, which requires choosing between consistency and availability. Availability in the table below means that the specified client can operate, at the expense of consistency with another client. Consistency means that the specified client will be unavailable, in order to be consistent with another client.

ModeAsynchronous ModeSynchronous Mode

Link State

StartupOut-of-syncDisconnectedStartupOut-of-syncDisconnected

Source Consumers

Available1Available1Available1,2Available1Available1Available1,2

Source Non-Transactional Producers

Available1Available1Available1,2Available1Available1Available1,2
Source Transactional ProducersAvailable3Available4Available2Available3Consistent5Available2
Target Consumers in Non-Replicated GroupAvailable6Available6Available2Consistent7Consistent8Available2
Target
Producers
Consumers in Replicated GroupConsistent9Consistent9Available2Consistent7,9Consistent9Available2
Target ProducersConsistent10Consistent10Available2Consistent10Consistent10Available2
  1. Source clients not involved in transactions will always prioritize availability over consistency
  2. When the link is permanently disconnected, clients on each cluster are not required to be consistent and can show whatever the state was when the link was disconnected.
  3. Transactional clients are available during link startup, to allow creating links on transactional topics without downtime.
  4. Transactional clients are available when asynchronous links are out-of-sync, because the source cluster is allowed to ack transactional produces while async replication is offline or behind.
  5. Transactional clients are consistent when synchronous links are out-of-sync, because the destination topic is readable by target consumers. Transactional produces to a topic with an out-of-sync synchronous replication link should timeout or fail.
  6. Consumers of an asynchronous replicated topic will see partial states while the link is catching up
  7. While starting up a synchronous replicated topic, consumers will be unable to read the partial topic. This allows source transactional produces to proceed while the link is starting (3)
  8. Consumers of a synchronous replicated topic should always see the same contents as the source topic
  9. Consumers within a replicated consumer group will be unable to commit offsets, as the offsets are read-only while the link is active.
  10. Producers targeting the replicated topic will fail because the topic is not writable until it is disconnected. Allowing a dualparallel-write to the replicated topic would be inconsistent.

Unclean Link Recovery

After a link is disconnected, the history of the two topics is allowed to diverge arbitrarily, as each leader will accept writes which are uncorrelated. There should be a way to reconnect a link, and reconcile the two histories by choosing one to overwrite the other.

This process can be manually-initiated, and involves choosing which direction the link should flow, and what the last shared offset was before the divergence occurred. Topics on the destination will be truncated to the last shared offset, and cross-cluster-replication will be restarted.

For a single link, we can remember what the last offset received by the target was prior to the disconnect, and truncate from there.

For example, a link A→B is set up, A and B are partitioned, and the link A→B is disconnected, and then the A/B network partition resolves. A and B diverge, and the operator chooses B as the new "source". A link B→A is set up, and A is truncated to the last common offset before replication starts.

For multiple links, and links that didn't carry traffic, the last common offset depends on where the replicated data came from. Perhaps we can remember the provenance (cluster-id/topic-id) of ranges of offsets, and find the most recent offset which originated in the same topic amongst the clusters? Or force the operator/management layer to decide the truncation offset. Anything other than comparing hashes of messages or byte-for-byte equality checks.

For example two links A→B and A→C are set up, A is partitioned from B and C, B is chosen as the new leader, the links A→B and A→C should be disconnected, and a link B→C connected. B and C will then need to perform unclean link recovery, determining the last offset that they both got from A prior to the disconnect. C then truncates to that common offset, and begins replicating from B.

Remote and Local Logs

  • Remote log replication should be controlled by the second tier's provider.
  • Remote logs can be referenced if not replicated by the second tier's provider so that replicated Kafka topics reference the source remote log storage.

Namespace Reservation

When a link is established, or reconfigured, it will reserve topic and consumer group namespaces in the destination cluster. These reservations will prevent coincidentally named topics and consumer groups from being created which would conflict with topics/consumer groups that could be created later in the source cluster.

  • When a client tries to create a resource which has a namespace reservation, creating that resource should fail (with a new error? with the most closely related existing error?)
  • When a link is established or reconfigured, and the necessary reservation conflicts with existing topics or existing reservations, those resources will become managed by the replication link.
  • If a link is reconfigured such that a managed topic is no longer included, that topic will become disconnected, similar to if the link was disconnected for all topics.

Networking

  • The network path between Kafka clusters is assumed to have less uptime, lower bandwidth,  and higher latency than the intra-cluster network, and have more strict routing constraints.
  • Network connections for replication links can proceed either source→target or target→source to allow one cluster to be behind a NAT (can we support NAT punching as a first-class feature, to allow both clusters to be behind NAT?)
  • Allow for simple traffic separation of cross-cluster connections and client/intra-cluster connections (do we need to connect to something other than the other cluster's advertised listeners?)

...