Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Overview

A Qpid cluster is a group of brokers co-operating to provide the
illusion of a single "virtual broker" with some extra qualities:

...

More precisely we define transparent failover to mean this: In the
event of failover, the sequence of protocol commands sent and received
by the client on each channel excluding failover-related commands is
identical to the sequence that would have been sent/received if no
failure had occured.

Given this definition the failover component of an AMQP client library can simply hide all failover-related commands from the application (indeed from the rest of the library) without breaking any semantics.

...

Table of Contents

...

Requirements

TODO: Define levels of reliability we want to provide - survive one
node failure, survive multiple node failures, survive total failure,
network partitions etc. Does durable/non-durable message distinction
mean anything in a reliable cluster? I.e. can we lose non-durable
messages on a node failure? Can we lose them on orderly shutdown or
total failure?

TODO: The requirements triangle. Concrete performance data.

Clients only need to use standard AMQP to talk to a cluster. They need
to understand some AMQP extensions to fail-over or resume sessions.
We will also use AMQP for point-to-point communication within the
cluster.

Ultimately we may propose extensions to AMQP spec but for the initial implementation we can use existing extension points:

...

A broker is a container for 3 types of broker components: queues, exchanges and bindings. Broker components represent resources available to multiple clients, and are not affected by clients connecting and disconnecting

Footnote

Exclusive or auto-delete queues are deleted on disconnect, we'll return to this point.

. Persistent broker components are unaffected by shut-down and re-start of a broker.

A client uses the components contained in a broker via the AMQP
protocol. The client components are connection, channel, consumer and
session[Footnote( session

Footnote

The "session" concept is not fully defined in AMQP 0-8 or 0-9 but is under discussion. This design note will define a session that may be proposed to AMQP.

)

. Client components represent the relationship between a client
and a broker.

TODO: Where do transactions fit in the model? They are also a kind of relationship components but Dtx transactions may span more than just client/broker.

A client's interaction with a unclustered individual broker[Footnote(

Footnote

An individual broker by this definition is really a broker behind a single AMQP address. Such a broker might in fact be a cluster using technologies like TCP failover/load balancing. This is outside the scope of this design, which focusses on clustering at the AMQP protocol layer, where cluster members have separate AMQP addresses.

)

is
defined by AMQP 0-8/0-9: create a connection to the brokers address,
create channels, exchange AMQP commands (which may create consumers),
disconnect. After a disconnect the client can reconnect to the same
broker address. Broker components created by the previous connection
are preserved but client components are not. In the event of a
disorderly disconnect the outcome of commands in flight can only be
determined by their effects on broker components.

A broker cluster (or just cluster) is a "virtual" broker implemented by several member brokers (or just members.) A cluster has many AMQP addresses - the addresses of all its members - all semantically equivalent for client connection [Footnote(

Footnote

They may not be equivalent on other grounds, e.g. network distance from client, load etc.

)

. The cluster members co-operate to ensure:

...

If a connection is closed in an orderly manner by either end, the
session is also closed. However if there is an abrupt disconnect with
no Connection.close command, the session remains viable for some
(possibly long) timeout period. The client can reconnect to a failover
candidate and resume.

A session is like an extended connection: if you cut out the failover commands the conversation on a session is exactly the conversation the client would have had on a single connection to an individual broker with no failures.

If a connection is in a session, events in the AMQP.0-8 spec that are
triggered by closing the connection (e.g. deleting auto-delete queues)
are instead trigged by the close (or timeout) of the session.

Note the session concept could also be used in the absence of clustering to allow a client to disconnect and resume a long-running session with the same broker. This is outside the scope of this design.

Cluster State

...

We have to consider several kinds of state:

  • Wiring: existence of exchanges, Queues and bindings between them.
  • Content: messages on queues and references under construction.
  • Cluster membership: list of members
  • Conversational state: Sessions, channels, prefetch windows, consumers, acks, etc.

Wiring and content needs to be replicated in a fault tolerant way among the brokers so all brokers can provide access to all resources. Persistent data also needs to be stored persistently for recovery from total failure or deliberate shutdown.

Cluster membership needs to be replicated among brokers and passed to
the client. The client needs to know which members are candidates for
failover.

Conversational state relates to a client-broker relationship:

  • session identity.
  • open channels, channel attributes (qos).
  • consumers.
  • commands "in flight" - sent but not acknowledged.
  • acknowledged command history.

To resume successfully the converstaional state must be re-established on both sidess. There are several options about how much state is stored where. We'll outline the solution that minimizes broker-side replication, but it's not clear yet if this is the optimal choice.

To minimize converstaional replication on the broker, the broker must
replicate at least:

  • session identities: to recognize the session on reconnect.
  • history of acknowleded commands: to ignore duplicates resent by client.
  • commands in flight: to resend to client and/or handle client responses.

Everything else can be re-established by the client:

  • re-open the channels.
  • re-set qos settings.
  • re-create consumers.
  • ignore incoming duplicates during reconnect.
  • re-send all unacknowledged commands.

Note that all of the above can be accomplished with normal AMQP commands.

The broker could replicate more converstaional state to relieve the
client from re-creating it. It's not clear yet where the best tradeoff
lies since it's not possible to have 0 conversational state on the
broker. Minimizing the brokers role seems like a good approach since
replicating data affects the whole cluster, keeping it on the
client affects only one client-broker connection.

Replication

The different types of state are replicated in different ways to
strike the best performance/reliability balance.

Cluster membership and Wiring

Membership changes are replicated to entire cluster symmetrically
using a virtual synchrony protocol.

Connected clients are also notified of changes to the set of fail-over candidates for that client. Clients are notified over AMQP by binding a queue to a special system exchange.

...

and Replication

Replication mechanisms

Virtual Synchrony protocols such as AIS or JGroups use multicast and allow a cluster to maintain a consistent view across all members. We will use such a protocol to replicate low-volume membership and wiring changes.

Primary/backup one primary owner of a given object replicates point-to-point to one or mor backups. On failure of primary one backup takes over. Appropraite for commodity hardware where each node has independent store.

Shared store such as GFS. Primary updates shared store, backups recover from store. For hot backup the primary can also forward point-to-point to backups. Appropriate for high-speed storage network.

TODO: Is GFS also appropriate for comoddity? Can we push more reliability work down into the GFS layer?

Proxy: forwards traffic to the primary. Allows all objects to be visible on all nodes. (Note backup-as-proxy can be optimized as a special case to reduce traffic.)

For virtual synchrony we will use specialized multicast protocol such as Totem or JGroups. For point-to-point communication we will use AMQP. As far as possible we will use ordinary AMQP operations on special system queues and exchanges rather than requiring protocol extensions. Ultimately we will propose extensions for interoperability but we should be able to prove the implementation without them.

Types of state We have to consider several kinds of state:

  • Cluster Membership: Active cluster members (nodes) and data about them.
  • AMQP Wiring: Names and properties of queues, exchanges and bindings.
  • AMQP Content: Data in messages on queues.
  • Session: Conversation with a single client, including references.

Data must be replicated and stored such that:

  • A client knows which node(s) can be used for failover.
  • After a failover, the client can continue its session uninterruped.
  • No acknowledged messages or commands are lost.
  • No messges or commands are applied twice.

Cluster membership, wiring and session identities are low volume, and will be replicated using virtual synchrony so the entire cluster has a consistent picture.

Queue content is high volume so it is replicated point-to-point using primary-backup to avoid flooding the network.

Session state is potentially high volume and only relevant to a single client, so it is also replicated point-to-point.

How to choose the number and location of backup nodes for a given queue or session is an open question. Note that the choice is independent for every queue and session in principle, but in practice they will probably be grouped in some way.

The Cluster Map: Membership and Wiring

Membership, wiring and session changes are low volume. They are replicated to entire cluster symmetrically using a virtual synchrony protocol such as openAIS or JGroups.

Wiring inclues

  • exchange names and properties
  • queue names, properties and bindings.

Membership data includes:

  • address of each node
  • state of health of each node
  • primary/backup for each queue/exchange
  • session names, primary/backup for each session.

Queue Content

Message content is too high volume to replicate to the entire cluster.
To limit the extent of replication each queue or reference plays
exactly one of the following roles in each broker:

  • Primary: content owner, all modifications are done by primary.
  • Proxy: Forwards client requests to the primary.
  • Backup: A proxy that also receives a replica of the content and can take over if the primary fails.
  • Fragment: special case for shared queues, see below.

, so each queue has a primary node and one or more backup nodes. Other nodes can act as proxies. The client is unaware of the distinction, it sees an identical picture regardless of what broker it connects to.

Note a single cluster node A single cluster member may contain a mix of primary, backup and proxy queues. (

TODO: mechanism for establishing primary, backup etc.)

The client is unaware of the distinction, it sees an identical picture
regardless of what broker it connects to.

TODO: Ordering issues with proxys and put-back messages (reject, transaction Ordering issues with proxys and put-back messages (reject, transaction rollback) or selectors.

Fragmented Shared Queues

A shared queue has reduced ordering requirements and increased
distribution requirements. "Fragmenting" a shared queue is a special
type of replication where each of . The queue is broken into a set of brokers holds a disjoint
subset (fragment) of the messages on the queue. The idea is to
distribute load over independent fragments hosted in separate brokersdisjoint sub-queues each on a separate node to distribute load.

Each fragment 's (sub-queue) content is replicated to backups independently just like a normal queue, independently of the other fragments.

The fragments collaberate to create the appearance of a single queue is created by collaboration between
fragments. Fragments store incomging messges in the local queue, and
server serve local consumers from the local queue whenever possible. Only
when When a fragment cannot does not have messages to satisfy its consumers does it consume consumes messages
from other fragments in the group. Proxies to a fragmented queue will consume from the "nearest" fragment if possible.

TODO: Proxies can play a more active role. Ordering guarantees, we can
provide "same producer to same consumer preserves order" since
messages from the same producer always go on the same fragment
queue. May break down in the presence of failover unless we remember
which fragment received messges from the client and proxy to the same
one on the failover replica.

Conversational State

The minimum converstaional state a broker needs to replicate for failover is

  • session identities: to recognize the session on reconnect.
  • history of processed request-ids: to ignore duplicates resent by client.
  • all requests in flight: to resend to client and/or handle client responses.
  • open incoming references.

Session identities and processed requiest history is very low volume and could be replicated with virtual synchrony. However commands in flight and open incoming references are probably too large to be replicated this way.

Enter the session backup: the broker directly connected to a client is
the session primary. The session primary has one or more session backup
brokers. Conversational state is replicated only to the session
backups, not to the entire group. On failure the client must reconnect
to one of the session backups, other members will not accept the
session.

Private queues are a bit special: they're not exactly conversational state but they are closely tied to the session. Private queues will always be backed up by the same node as the session that created them.

Shared storage

In the case of high-performance, reliable shared storage (e.g. GFS)
queue content can be stored instead of replicated to backups. In that
case the conversations with content backups can be dehydrated, on
failover the backup can recover data from shared store.

For commodity hardware cases we need a solution with no shared store as described above.

consume from the "nearest" fragment if possible.

TODO: Proxies can play a more active role. Ordering guarantees, we can provide "same producer to same consumer preserves order" since messages from the same producer always go on the same fragment queue. May break down in the presence of failover unless we remember which fragment received messges from the client and proxy to the same one on the failover replica.

Session State

Session state relates to a client-broker relationship:

  • open channels, channel attributes (qos, transactions etc.).
  • active consumers.
  • open references.
  • completed command history.
  • commands in flight.

The broker a client is connected to is the primary or host for that client's session. One or more other brokers are session backups. The client can fail over to one of the session backups if the primary fails.

Note channels and consumers state can be rebuilt by the client using standard AMQP commands so they don't need to be replicated

TODO: is there any benefit to replicating this?

The backup brokers store:

  • content in open references.
  • requests sent but not yet responded to.
  • responses sent but not yet marked.

On resume we take the following steps:

  • client rebuilds channel/consumer state
  • client & broker re-send in-doubt messages
  • client & broker ignore duplicate in-doubt messages

These steps should be hidden in the client library. At this point we can return control to the application where it left off.

TODO: Big problem with this - how do/do we re-sync frame numbers and method IDs post reconnect? If so how do we reconcile "ordinary" AMQP commands used to reconstruct the session state from commands that are part of the "resumed" session? Or do we move all the state to the broker, since we're stuck with significant client state on the broker anyway?

Mapping of AMQP commands to replication mechanisms

queue.declare/bind/delete, exchange.declare/delete: update in cluster map.

message.transfer/basic.publish (client to broker):

  • on primary: update local queue, replicate to backup.
  • on proxy: forward to primary

(When the proxy is also a backup we can optimize out the replication step.)

message.transfer(broker to client), message.deliver:

  • primary: replicate deliver to backup(s) deliver to client.
  • proxy: pass through to client.

message.consume/basic.consume:

  • proxy: forward consume. No replication, client will re-establish consumers.
  • primary: register consumer.

baskc.ack/message.ok(from client)

  • proxy: forward
  • primary: mark message processed, replicate to backups.

basic.ack/message.ok(from broker)

  • proxy: forward to client
  • client: mark message processed.

reference.open/apppend/close (client to broker)

  • proxy: replicate to session backup, forward to primary.
  • primary: process.

reference.open/apppend/close (broker to client)

  • primary: send open/append/close.
  • proxy: replicate to session backup, forward to client.

All commands:

  • proxy replicates required command history to backup.

Note that some commandsTODO: Would we wan to use shared store for conversational state?

Client-Broker Protocol

TODO: How it looks in client protocol terms - system queues &
exchanges, connect argument tables.

...

Broker-broker communication uses normal AMQP over specially identified
connections and channels (identified in the connection negotiation
argument table.)

Proxying: Proxies simply forward methods between client and primary and create consumers on behalf of the client. TODO: any issues with message put-back and transactions?

Queue/fragment replication: Use AMQP transfer commands to transfer content
to backup(s). TODO: propagating transactions.

Session replication:

must replicate a command (and get confirmation it was

...

replicated) before responding. However this can be done in async

...

streams - forward commands to replica

Session replication: AMQP on special connections. Primary forwards all outgoing requests and incoming responses to session backup. Backup can track the primary request/response tables and retransmit messages.

Note the
dehydrated requests to the session backup should reference the content
backup so the backup can recover content in a failure. Alternatively
content could be sent to both - what's the tradeoff?

...

MQ: write to journal, write journal to DB, read from DB.
Consistent & reliable but slow.

...

For flow-to-disk, when queues are full, both store and reading are
critical.

So it looks like the same solution will work for durable and reliable.

Flow-to-disk has different requirements but it would be desirable to
re-use some or all of the durable/reliable solution. In particular if
flow-to-disk is combined with durable/reliablle it would be wasteful
to write the message to disk a second time - instead it would seem
better to keep an in-memory index that allows messages to be read
quickly from the reliable/durable store.

...

For reliability and durability we will use a message journal per
queue. The broker writes enqueue and dequeue records to the end of the
active journal file. When the file reaches a fixed size it starts a
new one.

A cleanup agent (thread or process) removes, recycles or compacts journal files that have no live (undelivered) messages. (References complicate the book-keeping a little but don't alter the conceptual model.)

Recovery or restart reconstructs the queue contents from the
enqueue/dequeue records in the journal.

...

If flow-to-disk is combined with reliability then messages are
automatically journalled on arrival, so flow-to-disk can simply delete
them from memory and use the in-memory index to read them for delivery.

...

No write on fast consume: Optimization - if we can deliver (and get
ack) faster than we write then no need to write. How does this
interact with HA?

Async journalling: writing to client, writing to journal, acks from client, acks from journal are separate async streams? So if we get client ack before the journalling stream has written the journal we cancel the write? But what kind of ack info do we need? Need a diagram of interactions, failure points and responses at each point. Start simple and optimize, but dont rule out optimizations.

...

Is memory-only replication with no disk a viable option for high-speed
transient message flow? We will lose messages in total failure or
multiple failures where all backups fail, but we can survive single
failures and will run a lot faster than diskful.

Virtual synchrony

TODO: Wiring & membership via virtual synchrony

TODO: journaling, speed. Will file-per-q really help with disk burnout?

Configuration

Simplifying patterns

...

Possible ways to configure a cluster:

  • Virtual hosts as units of replication.
  • Backup rings: all primary components in a broker use the same backup broker and vice-versa. Backups form rings.
  • Broker component rinks: all the components except sessions have the same backup broker. Session backups are chosen at random so a brokers load will be distributed rather than all falling on its backup.
  • Disk management issues?
  • Shared storage issues?

...

If a queue moves we have to redirect its consumers, can't redirect
entire channels! Channels in the same session may move between
connections. Or rather we depend on broker to proxy?

...

Should new backups acquire state from primary, from disk or possibly
both? Depends on GFS/SAN vs. commodity hw?

...

Issues: double failure in backup ring: A -> B -> C. Simultaneous
failure of A and B. C doesn't have the replica data to take over for A.

...

The following are independently useful units of work that combine to
give the full story:

Proxy Queues: Useful in federation. Pure-AMQP proxies for exchanges might also be useful but are not needed for current purpose as we will use virtual synchrony to replicate wiring.

Fragmented queues: Over pure AMQP (no VS) useful by itself for unreliable
high volume shared queue federation.

...

Primary-backup replication: Over AMQP, no persistence. Still offers some
level of reliability in a simple primary-backup pair.

Persistence: Useful on its own for flow-to-disk and durable messages. Must meet the performance requirements of reliable journalling.

...

<muse:fn-sep?/> Exclusive or auto-delete queues are deleted on disconnect, we'll return to this point.

Footnotes Display