Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add group reservations, unclean link recovery, and non-goals.

...

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. They run in external processes which may experience outages when both Kafka clusters are otherwise healthy and accepting clients
  2. They represent an additional operational burden beyond just running Kafka
  3. They provide logical replication, in which the offset of a record in the source and target are given different offsets
  4. The replication is asynchronous, preventing the extension of strong consistency guarantees between Kafka clusters
  5. Replication does not preserve offsets of individual records
  6. Replication does not preserve exactly-once-semantics for records & consumer offsets

Goals

  • Replicate topics and other associated resources between intentionally separate Kafka clustersProvide physical replication semantics
  • Offsets for replicated records should be the same as in the origin Kafka
  • Preserve exactly-once-semantics (immediately, in some configurations, or as a later extension of the feature?)for replicated records

Non-Goals

  • Implement multi-leader partitions that accept writes on multiple brokers.
  • Perform distributed consensus to elect partition leaders across multiple clusters
  • Provide transparent failover of single clients between Kafka clusters

Public Interfaces

User Interface

  • New AdminClient methods for managing Replication Links on both the source and destination clusters
  • A single cluster can participate in arbitrarily many links, and both a source and destination simultaneously.
  • Links accept a configuration, including topics.regex and consumer.groups.regex, and a mode selector that accepts either Asynchronous or Synchronous
    • Can a link change it's topics.regex or consumer.groups.regexWhat happens if a topic or group is created or deleted while the link is in-sync? Does the link change state?
    • Can a link change from synchronous to asynchronous?
  • Existing Consumers & Producers can access replicated topics without an upgrade or reconfiguration.
  • Metrics allow observation of the state of each partition included in the replication link and progress of the replication flow (lag, throughput, etc)
  • During and shortly after network partitions, the link will be out-of-sync while it catches up with the source ISR
  • Links can be temporarily or permanently disconnected. During temporary disconnects, the replicated topic is not writable. After a permanent disconnect, the replicated topic is writable.

...

  • manually disconnected, after which the destination topic becomes writable.
    • Can we manually reverse a link while keeping consistency?
    • Can we reconnect a link, truncating the target if it has diverged?

Data Semantics

Relation to Intra-Cluster replication

Cross-Cluster Replication is similar to Intra-Cluster replication, as both cross-cluster topics and intra-cluster replicas:

...

  • Are subject to the target cluster's ACL environment
  • Are not included in acks=all or min-ISR requirements (optional? maybe acks=all is useful/required for EOS producers that want to make sure cross-cluster replication finishes)
  • Are not eligible for fetch-from-follower on the source cluster
  • Have a separate topic-id

Consumer Offsets

  • eligible for fetches from source cluster consumers
  • Have a separate topic-id

Replication Mode

  • Asynchronous mode allows replication of a record to proceed after the source cluster has ack'd the record to the producer.
    • Maintains existing latency of source producers, but provides only single-topic consistency guarantees
  • Synchronous mode requires the source cluster to delay acks (or delay commits?) for a producer until after the record has been replicated to all ISRs for all attached synchronous replication links.
    • Also applies to consumer group offsets submitted by the consumer or transactional producer 
    • Increases latency of source producers, in exchange for multi-topic & consumer-offset consistency guarantees

Network Partition Behavior

We must support network partition tolerance, which requires choosing between consistency and availability. Availability in the table below means that the specified client can operate, at the expense of consistency with another client. Consistency means that the specified client will be unavailable, in order to be consistent with another client.

ModeAsynchronous ModeSynchronous Mode

Link State

StartupOut-of-syncDisconnectedStartupOut-of-syncDisconnected

Source Consumers

Available1Available1Available1,2Available1Available1Available1,2

Source Non-Transactional Producers

Available1Available1Available1,2Available1Available1Available1,2
Source Transactional ProducersAvailable3Available4Available2Available3Consistent5Available2
Target Consumers in Non-Replicated GroupAvailable6Available6Available2Consistent7Consistent8Available2
Target Consumers in Replicated GroupConsistent9Consistent9Available2Consistent7,9Consistent9Available2
Target ProducersConsistent10Consistent10Available2Consistent10Consistent10Available2
  1. Source clients not involved in transactions will always prioritize availability over consistency
  2. When the link is permanently disconnected, clients on each cluster are not required to be consistent and can show whatever the state was when the link was disconnected.
  3. Transactional clients are available during link startup, to allow creating links on transactional topics without downtime.
  4. Transactional clients are available when asynchronous links are out-of-sync, because the source cluster is allowed to ack transactional produces while async replication is offline or behind.
  5. Transactional clients are consistent when synchronous links are out-of-sync, because the destination topic is readable by target consumers. Transactional produces to a topic with an out-of-sync synchronous replication link should timeout or fail.
  6. Consumers of an asynchronous replicated topic will see partial states while the link is catching up
  7. While starting up a synchronous replicated topic, consumers will be unable to read the partial topic. This allows source transactional produces to proceed while the link is starting (3)
  8. Consumers of a synchronous replicated topic should always see the same contents as the source topic
  9. Consumers within a replicated consumer group will be unable to commit offsets, as the offsets are read-only while the link is active.
  10. Producers targeting the replicated topic will fail because the topic is not writable until it is disconnected. Allowing a parallel-write to the replicated topic would be inconsistent.

Unclean Link Recovery

After a link is disconnected, the history of the two topics is allowed to diverge arbitrarily, as each leader will accept writes which are uncorrelated. There should be a way to reconnect a link, and reconcile the two histories by choosing one to overwrite the other.

This process can be manually-initiated, and involves choosing which direction the link should flow, and what the last shared offset was before the divergence occurred. Topics on the destination will be truncated to the last shared offset, and cross-cluster-replication will be restarted.

For a single link, we can remember what the last offset received by the target was prior to the disconnect, and truncate from there.

For example, a link A→B is set up, A and B are partitioned, and the link A→B is disconnected, and then the A/B network partition resolves. A and B diverge, and the operator chooses B as the new "source". A link B→A is set up, and A is truncated to the last common offset before replication starts.

For multiple links, and links that didn't carry traffic, the last common offset depends on where the replicated data came from. Perhaps we can remember the provenance (cluster-id/topic-id) of ranges of offsets, and find the most recent offset which originated in the same topic amongst the clusters? Or force the operator/management layer to decide the truncation offset. Anything other than comparing hashes of messages or byte-for-byte equality checks.

For example two links A→B and A→C are set up, A is partitioned from B and C, B is chosen as the new leader, the links A→B and A→C should be disconnected, and a link B→C connected. B and C will then need to perform unclean link recovery, determining the last offset that they both got from A prior to the disconnect. C then truncates to that common offset, and begins replicating from B.

Remote and Local Logs

  • Remote log replication should be controlled by the second tier's provider.
  • Remote logs can be referenced if not replicated by the second tier's provider so that replicated Kafka topics reference the source remote log storage.

Namespace Reservation

When a link is established, or reconfigured, it will reserve topic and consumer group namespaces in the destination cluster. These reservations will prevent coincidentally named topics and consumer groups from being created which would conflict with topics/consumer groups that could be created later in the source cluster.

  • When a client tries to create a resource which has a namespace reservation, creating that resource should fail (with a new error? with the most closely related existing error?)
  • When a link is established or reconfigured, and the necessary reservation conflicts with existing topics or existing reservations, those resources will become managed by the replication link.
  • If a link is reconfigured such that a managed topic is no longer included, that topic will become disconnected, similar to if the link was disconnected for all topics
  • Consumer offsets should be replicated to the target cluster synchronously, such that the offset arrives before the commit record (does this require 2PC? can this be configured in the replication link?)
  • An exactly-once application which fails-over from source to target clusters should maintain it's exactly-once semantics.

Networking

  • The network path between Kafka clusters is assumed to be less reliable have less uptime, lower bandwidth,  and higher latency than the intra-cluster network, and have more strict routing constraints.
  • Network connections for replication links can proceed either source→target or target→source to allow one cluster to be behind a NAT (can we support NAT punching as a first-class feature, to allow both clusters to be behind NAT?)
  • Allow for simple traffic separation of cross-cluster connections and client/intra-cluster connections (do we need to connect to something other than the other cluster's advertised listeners?)

...

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

User Stories

Disaster Recovery (multi-zone Asynchronous)

  1. I administrate multiple Kafka clusters in different availability zones
  2. I have a performance-sensitive application that reads in all zones but writes to only one zone at a time. For example, an application that runs consumers in zones A and B to keep caches warm but disables producers in zone B while zone A is running.
  3. I set up an asynchronous Cross-Cluster replication link for my topics and consumer groups from cluster A to cluster B. While the link is being created, applications in zone A are performant, and zone B can warm it's caches with historical data as it is replicated.
  4. I do the same with cluster A and cluster C (and others that may exist)
  5. When zone A goes offline, I want the application in zone B to start writing. I manually disconnect the A→B cross-cluster link, and trigger the application in zone B to begin writing.
  6. What happens to cluster C? Can we connect B→C quickly? What happens if C is ahead of B and truncating C breaks the cluster C consumers?
  7. When zone A recovers, I see that the history has diverged between zone A and B. I manually delete the topics in zone A and re-create the replication link in the opposite direction.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...

  • Both the source and target clusters should have a version which includes the Cross-Cluster Replication feature
  • Clusters which support the Cross-Cluster Replication feature should negotiate on the mutually-supported replication semantics
  • If one of the clusters is downgraded to a version which does not support Cross-Cluster Replication, the partner cluster should temporarily disable replication's link should fall out-of-sync.

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

...