Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This design discusses clustering at the AMQP protocol layer,
i.e. members of a cluster have distinct AMQP addresses and AMQP
protocol commands are exchanged to negotiate reconnection and
failover. "Transparent" failover in this context means transparent to
the application, the AMQP client will be aware of disconnects and must
take action to fail over.

...

Given this definition the failover component of an AMQP client library
can simply hide all failover-related commands from the application
(indeed from the rest of the library) without breaking any semantics.

...

Ultimately we may propose extensions to AMQP spec but for the initial
implementation we can use existing extension points:

  • Field table parameters to various AMQP methods (declare() arguments etc.)
  • Field table in message headers.
  • System exchanges and queues.

Abstract Model and Terms

A quick re-cap of AMQP terminology and introduction to some new terms:

A broker is a container for 3 types of broker components: queues,
exchanges and bindings. Broker components represent resources
available to multiple clients, and are not affected by clients
connecting and disconnecting . Persistent broker components are
unaffected by shut-down and re-start of a broker.unmigrated-wiki-markup

A _client_ uses the components contained in a broker via the AMQP
protocol. The _client components_ are _connection_, _channel_, _consumer_ and _
session_\[[Footnote(The "session" concept is not fully defined in AMQP 0-8 or 0-9 but is under discussion. This design note will define a session that may be proposed to AMQP.)]. Client components represent the relationship between a client
and a broker.

TODO: _Where do transactions fit in the model? They are also a kind of
relationship components but Dtx transactions may span more than just
client/broker._unmigrated-wiki-markup

A client's interaction with a unclustered _individual broker_\[[Footnote(An individual broker by this definition is really a broker behind a single AMQP address. Such a broker might in fact be a cluster using technologies like TCP failover/load balancing. This is outside the scope of this design, which focusses on clustering at the AMQP protocol layer, where cluster members have separate AMQP addresses.)] is
defined by AMQP 0-8/0-9: create a connection to the brokers _address_,
create channels, exchange AMQP commands (which may create consumers),
disconnect. After a disconnect the client can reconnect to the same
broker address. Broker components created by the previous connection
are preserved but client components are not. In the event of a
disorderly disconnect the outcome of commands in flight can only be
determined by their effects on broker components.

Wiki MarkupA _broker cluster_ A broker cluster (or just _cluster_) is a "virtual" broker implemented by several _member brokers_ (or just _members_.) A cluster has many AMQP addresses - the addresses of all its members - all semantically equivalent for client connection \[ [Footnote(They may not be equivalent on other grounds, e.g. network distance from client, load etc.)]. The cluster members co-operate to ensure:

  • all broker components are available via any cluster member.
  • all broker components remain available if a member fails, provided at least one member remains active.
  • clients disconnected by member failure or network failure can reconnect to another member and resume their session.

A session is an identity for a collection of client components (i.e. a
client-broker relationship) that can outlive a single connection. AMQP
0-9 provides some support for sessions in the `resume` command.

...

A session is like an extended connection: if you cut out the failover
commands the conversation on a session is exactly the conversation the
client would have had on a single connection to an individual broker
with no failures.

If a connection is in a session, events in the AMQP.0-8 spec that are
triggered by closing the connection (e.g. deleting auto-delete queues)
are instead trigged by the close (or timeout) of the session.

Note the session concept could also be used in the absence of
clustering to allow a client to disconnect and resume a long-running
session with the same broker. This is outside the scope of this
design.

Cluster State

We have to consider several kinds of state:

  • Wiring: existence of exchanges, Queues and bindings between them.
  • Content: messages on queues and references under construction.
  • Cluster membership: list of members
  • Conversational state: Sessions, channels, prefetch windows, consumers, acks, etc.

Wiring and content needs to be replicated in a fault tolerant way
among the brokers so all brokers can provide access to all
resources. Persistent data also needs to be stored persistently for
recovery from total failure or deliberate shutdown.

...

Conversational state relates to a client-broker relationship:

  • session identity.
  • open channels, channel attributes (qos).
  • consumers.
  • commands "in flight" - sent but not acknowledged.
  • acknowledged command history.

To resume successfully the converstaional state must be re-established
on both sidess. There are several options about how much state is
stored where. We'll outline the solution that minimizes broker-side
replication, but it's not clear yet if this is the optimal choice.

...

Connected clients are also notified of changes to the set of fail-over
candidates for that client. Clients are notified over AMQP by binding
a queue to a special system exchange.

...

Message content is too high volume to replicate to the entire cluster.
To limit the extent of replication each queue or reference plays
exactly one of the following roles in each broker:

  • Primary: content owner, all modifications are done by primary.
  • Proxy: Forwards client requests to the primary.
  • Backup: A proxy that also receives a replica of the content and can take over if the primary fails.
  • Fragment: special case for shared queues, see below.

A single cluster member may contain a mix of primary, backup and proxy
queues. (TODO: mechanism for establishing primary, backup etc.)

...

TODO: Ordering issues with proxys and put-back messages (reject,
transaction rollback) or selectors.

...

Each fragment's content is replicated to backups independently just
like a normal queue.

The appearance of a single queue is created by collaboration between
fragments. Fragments store incomging messges in the local queue, and
server local consumers from the local queue whenever possible. Only
when a fragment cannot satisfy its consumers does it consume messages
from other fragments in the group.

Proxies to a fragmented queue will consume from the "nearest" fragment
if possible.

TODO: Proxies can play a more active role. Ordering guarantees, we can
provide "same producer to same consumer preserves order" since
messages from the same producer always go on the same fragment
queue. May break down in the presence of failover unless we remember
which fragment received messges from the client and proxy to the same
one on the failover replica.

...

Session identities and processed requiest history is very low volume
and could be replicated with virtual synchrony. However commands in flight
and open incoming references are probably too large to be replicated this
way.

Enter the session backup: the broker directly connected to a client is
the session primary. The session primary has one or more session backup
brokers. Conversational state is replicated only to the session
backups, not to the entire group. On failure the client must reconnect
to one of the session backups, other members will not accept the
session.

Shared storage

Private queues are a bit special: they're not exactly conversational state but they are closely tied to the session. Private queues will always be backed up by the same node as the session that created them.

Shared storage

In the case In the case of high-performance, reliable shared storage (e.g. GFS)
queue content can be stored instead of replicated to backups. In that
case the conversations with content backups can be dehydrated, on
failover the backup can recover data from shared store.

For commodity hardware cases we need a solution with no shared store
as described above.

TODO: Would we wan to use shared store for conversational state?

...

Membership updates, choosing a replica, reconnection,
building conversational state => client stores representation of state
unacked messages & duplicates, hiding failover.

...

Proxying: Proxies simply forward methods between client and primary
and create consumers on behalf of the client. TODO: _any issues with
message put-back and transactions?_

Queue/fragment replication: Use AMQP transfer commands to transfer content
to backup(s). TODO: propagating transactions.

...

Session replication: AMQP on special connections. Primary forwards all
outgoing requests and incoming responses to session backup. Backup can
track the primary request/response tables and retransmit messages.

Note the
dehydrated requests to the session backup should reference the content
backup so the backup can recover content in a failure. Alternatively
content could be sent to both - what's the tradeoff?

Persistence and Recovery

Use cases for persistence

Durable messages: must be stored to disk across broker shutdowns.

Flow-to-disk: to cope with memory overflow on full queues.

Reliability: recover after a crash.

We need a common solution - it would make no sense to be writing the
same message to disk multiple times for different purposes. The
solution may be tunable to offer better performance for individual
cases but should cover the case where all 3 are required.

Competing failure modes:

Tibco: fast when running clean but performance over time has GC
"spikes" Single journal for all queues. "holes" in log have to be
garbage collected to re-use the log. 1 slow consumer affects everyone
because it causes fragmentation of the log.

MQ: write to journal, write journal to DB, read from DB.
Consistent & reliable but slow.

Street homegrown solutions: transient MQ with home grown
persistence. Can we get more design details for these solutions?

Persistence approach

Try to avoid the problems with:

  • journal-per-queue
  • multiple segment files per journal
  • GC to "snapshot" (per queue? per broker? think about queue migration.)
  • Disk thrashing - how do we distribute the parts to avoid it?

Need to think about how we consolidate journal into a "snapshot" or
"database" to reclaim space.

Competing failure modes:

Tibco: fast when running clean but performance over time has GC "spikes" Single journal for all queues. "holes" in log have to be garbage collected to re-use the log. 1 slow consumer affects everyone because it causes fragmentation of the log.

MQ: write to journal, write journal to DB, read from DB.
Consistent & reliable but slow.

Street homegrown solutions: transient MQ with home grown persistence. Can we get more design details for these solutions?

Persistence overview

There are 3 reasons to persist a message:

Durable messages: must be stored to disk across broker shutdowns or failures.

  • stored when received.
  • read during start-up.
  • must be removed after deliver.

Reliability: recover after a crash.

  • stored when received.
  • read during crash recovery.
  • must be removed after delivery.

Flow-to-disk: to reduce memory use for full queues.

  • stored when memory gets tight.
  • read when delivered.
  • must be removed after delivery.

Durable and reliable cases are very similar: storage time is performance-critical (blocks response to sender) but reading is not and cleanup can be done by an async thread or process.

For flow-to-disk, when queues are full, both store and reading are
critical.

So it looks like the same solution will work for durable and reliable.

Flow-to-disk has different requirements but it would be desirable to
re-use some or all of the durable/reliable solution. In particular if
flow-to-disk is combined with durable/reliablle it would be wasteful
to write the message to disk a second time - instead it would seem
better to keep an in-memory index that allows messages to be read
quickly from the reliable/durable store.

We also need to persist wiring (Queues/Exchanges/Bindings), but this is much less performance critical. The entire wiring model is held in memory so wiring is only read at startup, and updates are low volume and not performance-critical. A simple database should suffice.

Message Journals

For reliability and durability we will use a message journal per
queue. The broker writes enqueue and dequeue records to the end of the
active journal file. When the file reaches a fixed size it starts a
new one.

A cleanup agent (thread or process) removes, recycles or compacts journal files that have no live (undelivered) messages. (References complicate the book-keeping a little but don't alter the conceptual model.)

Recovery or restart reconstructs the queue contents from the
enqueue/dequeue records in the journal.

Flow-to-disk can re-use the journal framework, with a simple extension: the broker keeps an in-memory index of live messages in the journal.

If flow-to-disk is combined with reliability then messages are
automatically journalled on arrival, so flow-to-disk can simply delete
them from memory and use the in-memory index to read them for delivery.

Without reliability flow-to-disk is similar except that messages are only journalled if memory gets tight.

Disk thrashing: Why do we think skipping disk heads around between multiple journals will be better than seeking up and down a single journal? Are we assuming that we only need to optimize the case where long sequences of traffic tend to be for the same queue?

No write on fast consume: Optimization - if we can deliver (and get
ack) faster than we write then no need to write. How does this
interact with HA?

Async journalling: writing to client, writing to journal, acks from
client, acks from journal are separate async streams? So if we get
client ack before the journalling stream has written the journal we
cancel the write? But what kind of ack info do we need? Need a diagram
of interactions, failure points and responses at each point. Start
simple and optimize, but dont rule out optimizations.

...

Is memory-only replication with no disk a viable option for high-speed
transient message flow? We will lose messages in total failure or
multiple failures where all backups fail, but we can survive single
failures and will run a lot faster than diskful.

Virtual synchrony

...

TODO: Wiring & membership via virtual synchrony

...

TODO: journaling, speed. Will file-per-q really help with disk burnout

...

Configuration

...

?

Configuration

Simplifying patterns

Possible ways to configure a cluster:

  • Virtual hosts as units of replication.
  • Backup rings: all primary components in a broker use the same backup broker and vice-versa. Backups form rings.
  • Broker component rinks: all the components except sessions have the same backup broker. Session backups are chosen at random so a brokers load will be distributed rather than all falling on its backup.
  • Disk management issues?
  • Shared storage issues?

Dynamic cluster configuration

  • Failover: the primary use case.
  • Add node: backup, proxy, primary case?
  • Redirect clients from loaded broker (pretend failure)
  • Move queue primary from loaded broker/closer to consumers?
  • Re-start after failover.

Issue: unit of failover/redirect is connection/channel but "working
set" of queues and exchanges is unrelated. Use virtual host as unit
for failover/relocation? It's also a queue namespace...

...

Backups: chained backups rather than multi-backup? Ring backup?
What about split brain, elections, quorums etc.

...

Java/C++ interworking - is there a requirement? Fail over from C++ to Java?
Common persistence formats?

...

Proxy Queues: Useful in federation. Pure-AMQP proxies for exchanges
might also be useful but are not needed for current purpose as we will
use virtual synchrony to replicate wiring.

...

Virtual Synchrony Cluster: Multicast membership and total ordering
protocol for brokers. Not useful alone, but useful with proxies and/or
fragments for dynamic federations.

...

Persistence: Useful on its own for flow-to-disk and durable messages.
Must meet the performance requirements of reliable journalling.

...