Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 4.0

Reliable Broker Cluster

This document describes cluster design and implementation as of 19 June 2009.

Overview

A Qpid Reliable Broker Clusteror just cluster is a group of brokers co-operating collaborating to provide present the illusion of a single "virtual broker " with some extra qualities:

...

with multiple addresses. The cluster is active-active, that is to say each member broker maintains the full state of the clustered broker. If any member fails, clients can fail-over to

...

This design discusses clustering at the AMQP protocol layer, i.e. members of a cluster have distinct AMQP addresses and AMQP protocol commands are exchanged to negotiate reconnection and failover. "Transparent" failover in this context means transparent to the application, the AMQP client will be aware of disconnects and must take action to fail over.

More precisely we define transparent failover to mean this: In the event of failover, the sequence of protocol commands sent and received by the client on each channel excluding failover-related commands is identical to the sequence that would have been sent/received if no failure had occured.

Given this definition the failover component of an AMQP client library can simply hide all failover-related commands from the application (indeed from the rest of the library) without breaking any semantics.

Table of Contents

Requirements

TODO: Define levels of reliability we want to provide - survive one node failure, survive multiple node failures, survive total failure, network partitions etc. Does durable/non-durable message distinction mean anything in a reliable cluster? I.e. can we lose non-durable messages on a node failure? Can we lose them on orderly shutdown or total failure?

TODO: The requirements triangle. Concrete performance data.

Clients only need to use standard AMQP to talk to a cluster. They need to understand some AMQP extensions to fail-over or resume sessions. We will also use AMQP for point-to-point communication within the cluster.

Ultimately we may propose extensions to AMQP spec but for the initial implementation we can use existing extension points:

  • Field table parameters to various AMQP methods (declare() arguments etc.)
  • Field table in message headers.
  • System exchanges and queues.

Abstract Model and Terms

A quick re-cap of AMQP terminology and introduction to some new terms:

A broker is a container for 3 types of broker components: queues, exchanges and bindings. Broker components represent resources available to multiple clients, and are not affected by clients connecting and disconnecting

Footnote

Exclusive or auto-delete queues are deleted on disconnect, we'll return to this point.

. Persistent broker components are unaffected by shut-down and re-start of a broker.

A client uses the components contained in a broker via the AMQP protocol. The client components are connection, channel, consumer and session

Footnote

The "session" concept is not fully defined in AMQP 0-8 or 0-9 but is under discussion. This design note will define a session that may be proposed to AMQP.

. Client components represent the relationship between a client and a broker.

TODO: Where do transactions fit in the model? They are also a kind of relationship components but Dtx transactions may span more than just client/broker.

A client's interaction with a unclustered individual broker

Footnote

An individual broker by this definition is really a broker behind a single AMQP address. Such a broker might in fact be a cluster using technologies like TCP failover/load balancing. This is outside the scope of this design, which focusses on clustering at the AMQP protocol layer, where cluster members have separate AMQP addresses.

is defined by AMQP 0-8/0-9: create a connection to the brokers address, create channels, exchange AMQP commands (which may create consumers), disconnect. After a disconnect the client can reconnect to the same broker address. Broker components created by the previous connection are preserved but client components are not. In the event of a disorderly disconnect the outcome of commands in flight can only be determined by their effects on broker components.

A broker cluster (or just cluster) is a "virtual" broker implemented by several member brokers (or just members.) A cluster has many AMQP addresses - the addresses of all its members - all semantically equivalent for client connection

Footnote

They may not be equivalent on other grounds, e.g. network distance from client, load etc.

. The cluster members co-operate to ensure:

  • all broker components are available via any cluster member.
  • all broker components remain available if a member fails, provided at least one member remains active.
  • clients disconnected by member failure or network failure can reconnect to another member and resume their session.

A session is an identity for a collection of client components (i.e. a client-broker relationship) that can outlive a single connection. AMQP 0-9 provides some support for sessions in the `resume` command.

If a connection is closed in an orderly manner by either end, the session is also closed. However if there is an abrupt disconnect with no Connection.close command, the session remains viable for some (possibly long) timeout period. The client can reconnect to a failover candidate and resume.

A session is like an extended connection: if you cut out the failover commands the conversation on a session is exactly the conversation the client would have had on a single connection to an individual broker with no failures.

If a connection is in a session, events in the AMQP.0-8 spec that are triggered by closing the connection (e.g. deleting auto-delete queues) are instead trigged by the close (or timeout) of the session.

Note the session concept could also be used in the absence of clustering to allow a client to disconnect and resume a long-running session with the same broker. This is outside the scope of this design.

Cluster State and Replication

Replication mechanisms

Virtual Synchrony protocols such as AIS or JGroups use multicast and allow a cluster to maintain a consistent view across all members. We will use such a protocol to replicate low-volume membership and wiring changes.

Primary/backup one primary owner of a given object replicates point-to-point to one or mor backups. On failure of primary one backup takes over. Appropraite for commodity hardware where each node has independent store.

Shared store such as GFS. Primary updates shared store, backups recover from store. For hot backup the primary can also forward point-to-point to backups. Appropriate for high-speed storage network.

TODO: Is GFS also appropriate for comoddity? Can we push more reliability work down into the GFS layer?

Proxy: forwards traffic to the primary. Allows all objects to be visible on all nodes. (Note backup-as-proxy can be optimized as a special case to reduce traffic.)

For virtual synchrony we will use specialized multicast protocol such as Totem or JGroups. For point-to-point communication we will use AMQP. As far as possible we will use ordinary AMQP operations on special system queues and exchanges rather than requiring protocol extensions. Ultimately we will propose extensions for interoperability but we should be able to prove the implementation without them.

Types of state We have to consider several kinds of state:

  • Cluster Membership: Active cluster members (nodes) and data about them.
  • AMQP Wiring: Names and properties of queues, exchanges and bindings.
  • AMQP Content: Data in messages on queues.
  • Session: Conversation with a single client, including references.

Data must be replicated and stored such that:

  • A client knows which node(s) can be used for failover.
  • After a failover, the client can continue its session uninterruped.
  • No acknowledged messages or commands are lost.
  • No messges or commands are applied twice.

Cluster membership, wiring and session identities are low volume, and will be replicated using virtual synchrony so the entire cluster has a consistent picture.

Queue content is high volume so it is replicated point-to-point using primary-backup to avoid flooding the network.

Session state is potentially high volume and only relevant to a single client, so it is also replicated point-to-point.

How to choose the number and location of backup nodes for a given queue or session is an open question. Note that the choice is independent for every queue and session in principle, but in practice they will probably be grouped in some way.

The Cluster Map: Membership and Wiring

Membership, wiring and session changes are low volume. They are replicated to entire cluster symmetrically using a virtual synchrony protocol such as openAIS or JGroups.

Wiring inclues

  • exchange names and properties
  • queue names, properties and bindings.

Membership data includes:

  • address of each node
  • state of health of each node
  • primary/backup for each queue/exchange
  • session names, primary/backup for each session.

Queue Content

Message content is too high volume to replicate to the entire cluster, so each queue has a primary node and one or more backup nodes. Other nodes can act as proxies. The client is unaware of the distinction, it sees an identical picture regardless of what broker it connects to.

Note a single cluster node may contain a mix of primary, backup and proxy queues.

TODO: Ordering issues with proxys and put-back messages (reject, transaction rollback) or selectors.

Fragmented Shared Queues

A shared queue has reduced ordering requirements and increased distribution requirements. Fragmenting a shared queue is a special type of replication. The queue is broken into a set of disjoint sub-queues each on a separate node to distribute load.

Each fragment (sub-queue) content is replicated to backups just like a normal queue, independently of the other fragments.

The fragments collaberate to create the appearance of a single queue. Fragments store incomging messges in the local queue, and serve local consumers from the local queue whenever possible. When a fragment does not have messages to satisfy its consumers it consumes messages from other fragments in the group. Proxies to a fragmented queue will consume from the "nearest" fragment if possible.

TODO: Proxies can play a more active role. Ordering guarantees, we can provide "same producer to same consumer preserves order" since messages from the same producer always go on the same fragment queue. May break down in the presence of failover unless we remember which fragment received messges from the client and proxy to the same one on the failover replica.

Session State

Session state relates to a client-broker relationship:

  • open channels, channel attributes (qos, transactions etc.).
  • active consumers.
  • open references.
  • completed command history.
  • commands in flight.

The broker a client is connected to is the primary or host for that client's session. One or more other brokers are session backups. The client can fail over to one of the session backups if the primary fails.

Note channels and consumers state can be rebuilt by the client using standard AMQP commands so they don't need to be replicated

TODO: is there any benefit to replicating this?

The backup brokers store:

  • content in open references.
  • requests sent but not yet responded to.
  • responses sent but not yet marked.

On resume we take the following steps:

  • client rebuilds channel/consumer state
  • client & broker re-send in-doubt messages
  • client & broker ignore duplicate in-doubt messages

These steps should be hidden in the client library. At this point we can return control to the application where it left off.

TODO: Big problem with this - how do/do we re-sync frame numbers and method IDs post reconnect? If so how do we reconcile "ordinary" AMQP commands used to reconstruct the session state from commands that are part of the "resumed" session? Or do we move all the state to the broker, since we're stuck with significant client state on the broker anyway?

Mapping of AMQP commands to replication mechanisms

queue.declare/bind/delete, exchange.declare/delete: update in cluster map.

message.transfer/basic.publish (client to broker):

  • on primary: update local queue, replicate to backup.
  • on proxy: forward to primary

(When the proxy is also a backup we can optimize out the replication step.)

message.transfer(broker to client), message.deliver:

  • primary: replicate deliver to backup(s) deliver to client.
  • proxy: pass through to client.

message.consume/basic.consume:

  • proxy: forward consume. No replication, client will re-establish consumers.
  • primary: register consumer.

baskc.ack/message.ok(from client)

  • proxy: forward
  • primary: mark message processed, replicate to backups.

basic.ack/message.ok(from broker)

  • proxy: forward to client
  • client: mark message processed.

reference.open/apppend/close (client to broker)

  • proxy: replicate to session backup, forward to primary.
  • primary: process.

reference.open/apppend/close (broker to client)

  • primary: send open/append/close.
  • proxy: replicate to session backup, forward to client.

All commands:

  • proxy replicates required command history to backup.

Note that some commands

Client-Broker Protocol

TODO: How it looks in client protocol terms - system queues & exchanges, connect argument tables.

Membership updates, choosing a replica, reconnection, building conversational state => client stores representation of state unacked messages & duplicates, hiding failover.

Broker-Broker Protocol

Broker-broker communication uses normal AMQP over specially identified connections and channels (identified in the connection negotiation argument table.)

Proxying: Proxies simply forward methods between client and primary and create consumers on behalf of the client. TODO: any issues with message put-back and transactions?

Queue/fragment replication: Use AMQP transfer commands to transfer content to backup(s). TODO: propagating transactions.

Session replication:

must replicate a command (and get confirmation it was replicated) before responding. However this can be done in async streams - forward commands to replica

Session replication: AMQP on special connections. Primary forwards all outgoing requests and incoming responses to session backup. Backup can track the primary request/response tables and retransmit messages.

Note the dehydrated requests to the session backup should reference the content backup so the backup can recover content in a failure. Alternatively content could be sent to both - what's the tradeoff?

Persistence and Recovery

Competing failure modes:

Tibco: fast when running clean but performance over time has GC "spikes" Single journal for all queues. "holes" in log have to be garbage collected to re-use the log. 1 slow consumer affects everyone because it causes fragmentation of the log.

MQ: write to journal, write journal to DB, read from DB. Consistent & reliable but slow.

Street homegrown solutions: transient MQ with home grown persistence. Can we get more design details for these solutions?

Persistence overview

There are 3 reasons to persist a message:

Durable messages: must be stored to disk across broker shutdowns or failures.

  • stored when received.
  • read during start-up.
  • must be removed after deliver.

Reliability: recover after a crash.

  • stored when received.
  • read during crash recovery.
  • must be removed after delivery.

Flow-to-disk: to reduce memory use for full queues.

  • stored when memory gets tight.
  • read when delivered.
  • must be removed after delivery.

Durable and reliable cases are very similar: storage time is performance-critical (blocks response to sender) but reading is not and cleanup can be done by an async thread or process.

For flow-to-disk, when queues are full, both store and reading are critical.

So it looks like the same solution will work for durable and reliable.

Flow-to-disk has different requirements but it would be desirable to re-use some or all of the durable/reliable solution. In particular if flow-to-disk is combined with durable/reliablle it would be wasteful to write the message to disk a second time - instead it would seem better to keep an in-memory index that allows messages to be read quickly from the reliable/durable store.

We also need to persist wiring (Queues/Exchanges/Bindings), but this is much less performance critical. The entire wiring model is held in memory so wiring is only read at startup, and updates are low volume and not performance-critical. A simple database should suffice.

Message Journals

For reliability and durability we will use a message journal per queue. The broker writes enqueue and dequeue records to the end of the active journal file. When the file reaches a fixed size it starts a new one.

A cleanup agent (thread or process) removes, recycles or compacts journal files that have no live (undelivered) messages. (References complicate the book-keeping a little but don't alter the conceptual model.)

Recovery or restart reconstructs the queue contents from the enqueue/dequeue records in the journal.

Flow-to-disk can re-use the journal framework, with a simple extension: the broker keeps an in-memory index of live messages in the journal.

If flow-to-disk is combined with reliability then messages are automatically journalled on arrival, so flow-to-disk can simply delete them from memory and use the in-memory index to read them for delivery.

Without reliability flow-to-disk is similar except that messages are only journalled if memory gets tight.

Disk thrashing: Why do we think skipping disk heads around between multiple journals will be better than seeking up and down a single journal? Are we assuming that we only need to optimize the case where long sequences of traffic tend to be for the same queue?

No write on fast consume: Optimization - if we can deliver (and get ack) faster than we write then no need to write. How does this interact with HA?

Async journalling: writing to client, writing to journal, acks from client, acks from journal are separate async streams? So if we get client ack before the journalling stream has written the journal we cancel the write? But what kind of ack info do we need? Need a diagram of interactions, failure points and responses at each point. Start simple and optimize, but dont rule out optimizations.

What about persistence-free reliability?

Is memory-only replication with no disk a viable option for high-speed transient message flow? We will lose messages in total failure or multiple failures where all backups fail, but we can survive single failures and will run a lot faster than diskful.

Virtual synchrony

TODO: Wiring & membership via virtual synchrony

TODO: journaling, speed. Will file-per-q really help with disk burnout?

Configuration

Simplifying patterns Possible ways to configure a cluster:

  • Virtual hosts as units of replication.
  • Backup rings: all primary components in a broker use the same backup broker and vice-versa. Backups form rings.
  • Broker component rinks: all the components except sessions have the same backup broker. Session backups are chosen at random so a brokers load will be distributed rather than all falling on its backup.
  • Disk management issues?
  • Shared storage issues?

Dynamic cluster configuration

  • Failover: the primary use case.
  • Add node: backup, proxy, primary case?
  • Redirect clients from loaded broker (pretend failure)
  • Move queue primary from loaded broker/closer to consumers?
  • Re-start after failover.

Issue: unit of failover/redirect is connection/channel but "working set" of queues and exchanges is unrelated. Use virtual host as unit for failover/relocation? It's also a queue namespace...

If a queue moves we have to redirect its consumers, can't redirect entire channels! Channels in the same session may move between connections. Or rather we depend on broker to proxy?

Backups: chained backups rather than multi-backup? Ring backup? What about split brain, elections, quorums etc.

Should new backups acquire state from primary, from disk or possibly both? Depends on GFS/SAN vs. commodity hw?

Open Questions

Issues: double failure in backup ring: A -> B -> C. Simultaneous failure of A and B. C doesn't have the replica data to take over for A.

Java/C++ interworking - is there a requirement? Fail over from C++ to Java? Common persistence formats?

Implementation breakdown.

The following are independently useful units of work that combine to give the full story:

Proxy Queues: Useful in federation. Pure-AMQP proxies for exchanges might also be useful but are not needed for current purpose as we will use virtual synchrony to replicate wiring.

Fragmented queues: Over pure AMQP (no VS) useful by itself for unreliable high volume shared queue federation.

Virtual Synchrony Cluster: Multicast membership and total ordering protocol for brokers. Not useful alone, but useful with proxies and/or fragments for dynamic federations.

Primary-backup replication: Over AMQP, no persistence. Still offers some level of reliability in a simple primary-backup pair.

Persistence: Useful on its own for flow-to-disk and durable messages. Must meet the performance requirements of reliable journalling.

any other member.

New members can be added to a cluster while it is running. An established member volunteers to provide a state update to the new member. Both updater and updatee queue up cluster activity during the update and process it when the update is complete.

The cluster uses the CPG (Closed Process Group) protocol to replicate state. CPG was part of Open AIS package,  it is now part of the  corosync package. To avoid confusion with AMQP messages we will refer to CPG multicast messages as events.

CPG is a virtual synchrony protocol. Members multicast events to the group and CPG ensures that each member receives all the events in the same sequence. Since all members get an identical sequence of events, they can all update their state consistently. To achieve consistency, events must be processed in the order that CPG presents them. In particular members wait for their own events to be re-delivered by CPG before acting on them.

Implementation Approach

The cluster implementation is highly decoupled from the broker. There's no cluster-specific code in the general broker, just a few hooks that the cluster uses to modify broker behavior.

The basic idea is that the cluster treats the broker as a black box and assumes that provided it is fed identical input, it will produce identical results. The cluster::Connection class intercepts data arriving for broker Connections. and sends that data as a CPG event. As data events are delivered by CPG, they are fed to the original broker::Connection objects. Thus each member sees all the data arriving at all the members in the same sequence, so we get the same set of declares, enqueues, dequeues etc. happening on each member.

This approach replicates all broker state: sessions, connections, consumers, wiring etc.  Each broker can have both direct connections and shadow connections. A shadow connection represents a connection on another broker in the cluster. Members use shadow connections to simulate the actions of other brokers, so that all members arrive at the same state. Output for shadow connections is just discarded, brokers only send data to their directly-connected clients.

This approach assumes that the behavior of the broker is determinisitc, that it is completely determined by the input data fed to the broker. There are a number of cases where this does not hold and the cluster has to take steps to ensure consistency:

  • Allocating messages: the stand-alone broker allocates messages based on the writability of client connections.
  • Client connection disconnects.
  • Timers: any action triggered by a timer may happen at an unpredictable point with respect to CPG events.

Allocating messages

The cluster allocates messages to consumers using CPG events rather than writability of client connections. A cluster connection that has potentially got data to write sends a do-output event to itself, allowing it to dequeue N messages. The messages are not actually dequeued until the do-output event is re-delivered in sequence with other events. The value of N is dynamically estimated in an attempt to match it to the rate of writing messages to directly connected clients. All the other members have a shadow connection which allows them to de-queue the same set of messages as the directly connected member.

Client disconnects

When a client disconnects, the directly-connected broker sends a deliver-close event via CPG. It does not actually destroy the connection till that message is re-delivered. This ensures that the direct connection and all the shadows are destroyed at the same point in the event sequence.

 Actions initiated by a timer

The cluster needs to do some extra work at any points where the broker takes action based on a timer (e.g. message expiry, management, producer flow control)  See the source code for details of how each is handled.

Error Handling

There are two types of recoverable error

  • Predictable errors occur in the same way on all brokers as a predictable consequence of cluster events. For example binding a queue to a non-existent exchange.
  • Unpredictable errors may not occur on all brokers. For example running out of journal space to store a message, or an IO error from the journal.

Unpredictable errors must be handled in such a way that the cluster does not become inconsistent. In a situation where one broker experiences an unpredictable error and the others do not, we want the broker in error to shut down and leave the cluster so its clients can fail over to healthy brokers.

When an error occurs on a cluster member it sends an error-check event to the cluster and stalls processing. If it receives a matching error-check from all other cluster members, it continues. If the error did not occur on some members, those members send an error-check with "no error" status. In this case members that did experience an error shut themselves down as they can no longer consistently update their state. The member that did not have the error continue, clients can fail over to them.

Transactions

Transactions are conversational state, allowing a session to collect changes for the shared state and then apply them all at once or not at all.

For TX transactions each broker creates an identical transaction, they all succeed or fail identically since they're all being fed identical input (see Error Handling above for what happens if a broker doesn't reach the same conclusion.)

DTX transactions are not yet supported by the cluster.

Persistence and Asynchronous Journaling

Each cluster member has an independent store, each recording identical state.

A cluster can be configured so that if the cluster is reduced to a single member  (the "last man standing") that member can have transient data queues persisted.

Recovery: after a total cluster shutdown, the state of the new cluster is determined by the store of the first broker started. The second and subsequent brokers will get their state from the cluster, not the store.

At time of writing there is a bug that requires the stores of all but the first broker to be deleted manually before starting the cluste

Limitations of current design

There are several limitations of the current design.

Concurrency: all CPG events are serialized into a single stream and handled by a single thread. This means clustered brokers have limited ability to make use of multiple CPUs. Some of this work is pipelined, so there is some parallelism, but it is limited.

Maintainability:  decoupling the cluster code from the broker and assuming the broker behaves deterministically makes it very easy for developers working on the stand-alone broker to unintentionally break the cluster, for example by adding a feature that depends on timers. This has been the case in particular for management, since the initial cluster code assumed only the queue & exchange state needed to be replicated, whereas in fact all the management state must also be replicated and periodic management actions must be co-ordinated.

Non-replicated state: The current design replicates all state. In some cases however, queues are intended only for directly connected clients, for example management queues, the failover-exchange queues. It would be good to be able to define replicated and non-replicated queues and exchanges in these cases.

Scalability: The current cluster design only addresses reliability. Adding more brokers to a cluster will not increase the cluster's throughput since all brokers are doing all the work. A better approach would move move some of the work to be done only by the directly-connected broker, and to allow messages to "bypass" the cluster when both producer and consumer are connected to the same member.display-footnotes