Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Table of Contents

Overview

A Qpid cluster is a group of brokers co-operating to provide the
illusion of a single "virtual broker" with some extra qualities:

  • The cluster continues to provide service as long some members survive. Exact guarantee will depend on configuration.
  • If a client is disconnected unexpectedly it can fail-over to another cluster member, giving the impression of uninterupted service.

This design discusses clustering at the AMQP protocol layer,
i.e. members of a cluster have distinct AMQP addresses and AMQP
protocol commands are exchanged to negotiate reconnection and
failover. "Transparent" failover in this context means transparent to
the application, the AMQP client will be aware of disconnects and must
take action to fail over.

Ultimately we will propose extensions to AMQP spec but for the initial
implementation we can use existing extension points:

  • Field table parameters to various AMQP methods (declare() arguments etc.)
  • Field table in message headers.
  • System exchanges and queues.

Requirements

*Clients use standard AMQP* to talk to a cluster during normal
operation. They only need to use some extensions to get replica
information and during fail-over.

*Transparent failover*: In the event of failover, the sequence of
protocol commands sent and received by the client *excluding
failover-related commands* is identical to the sequence that would have
been sent/received if no failure had occured. Thus an AMQP client
library can hide failover-related commands from the application.

*Transactional failover*: In the event of a failover, any incomplete
transactions are rolled back. Any un-acknowledged non-transactional
commands may need to be re-transimtted.

**TODO**: Do we need to offer both? Transactional failover is a weaker
guarantee and only interesting if it offers better performance. It
allows persistence/replication to be deferred till prepare/commit
time.On the other hand in the normal sucessful case a similar amount
of data has to be written/replicated either way.

**TODO**: Define levels of reliability we want to provide - survive one
node failure, survive multiple node failures, survive total failure,
network partitions etc. Does durable/non-durable message distinction
mean anything in a reliable cluster? I.e. can we lose non-durable
messages on a node failure? Can we lose them on orderly shutdown or
total failure?

**TODO**: The requirements triangle. Concrete performance data.

Abstract Model and Terms

A quick re-cap of AMQP terminology and introduction to some new terms:

A broker is a container for 3 types of broker components: queues,
exchanges and bindings. Broker components represent resources
available to multiple clients, and are not affected by clients
connecting and disconnecting1

Footnote

Exclusive or auto-delete queues are deleted on disconnect, we'll return to this point.

. Persistent broker components are
unaffected by shut-down and re-start of a broker.

A client uses the components contained in a broker via the AMQP
protocol. The client components are connection, channel, consumer and
session2 session

Footnote

The "session" concept is not fully defined in AMQP 0-8 or 0-9 but is under discussion. This design note will define a session that may be proposed to AMQP.

. Client components represent the relationship between a client
and a broker.

A client's interaction with a unclustered individual broker 3 is
defined by AMQP 0-8/0-9: create a connection to the brokers address,

Footnote

An individual broker by this definition is really a broker behind a single AMQP address. Such a broker might in fact be a cluster using technologies like TCP failover/load balancing. This is outside the scope of this design, which focusses on clustering at the AMQP protocol layer, where cluster members have separate AMQP addresses.

is defined by AMQP 0-8/0-9: create a connection to the brokers address, create channels, exchange AMQP commands (which may create consumers),
disconnect. After a disconnect the client can reconnect to the same
broker address. Broker components created by the previous connection
are preserved but client components are not. In the event of a
disorderly disconnect the outcome of commands in flight can only be
determined by their effects on broker components.

A broker cluster (or just cluster) is a "virtual" broker implemented
by several member brokers (or just members.) A cluster has many AMQP
addresses - the addresses of all its members - all semantically
equivalent for client connection 4. The

Footnote

They may not be equivalent on other grounds, e.g. network distance from client, load etc.

. The cluster members co-operate to
ensure:

  • all broker components are available via any cluster member.
  • all broker components remain available if a single member fails. Service may degrade in multiple failures depending on configuration.
  • clients disconnected by a member failure or network failure can reconnect to another member and resume their session.

A session identifies a client-broker relationship that can outlive a
single connection. AMQP 0-9 provides some support for sessions in the
`resume` command.

Orderly closure of a connection by either peer ends the session. If
there is an unexpected disconnect, the session remains viable for some
(possibly long) timeout period and the client can reconnect to the
cluster and resume.

If a connection is in a session, events in the AMQP.0-8 spec that are
triggered by closing the connection (e.g. deleting auto-delete queues)
are instead trigged by the close (or timeout) of the session.

Note the session concept could also be used in the absence of
clustering to allow a client to disconnect and resume a long-running
session with the same broker. This is outside the scope of this
design.

Cluster State and Replication

Replication mechanisms

*Virtual Synchrony* protocols such as AIS or JGroups use multicast and
allow a cluster to maintain a consistent view across all members. We
will use such a protocol to replicate low-volume membership and wiring
changes.

*Primary/backup* one primary owner of a given object replicates
point-to-point to one or mor backups. On failure of primary one backup
takes over. Appropraite for commodity hardware where each node has
independent store.

*Shared store* such as GFS. Primary updates shared store, backups
recover from store. For hot backup the primary can also forward
point-to-point to backups. Appropriate for high-speed storage network.

**TODO**: Is GFS also appropriate for comoddity? Can we push more
reliability work down into the GFS layer?

*Proxy*: forwards traffic to the primary. Allows all objects to be
visible on all nodes. (Note backup-as-proxy can be optimized as a
special case to reduce traffic.)

For virtual synchrony we will use specialized multicast protocol such
as Totem or JGroups. For point-to-point communication we will use
AMQP. As far as possible we will use ordinary AMQP operations on
special system queues and exchanges rather than requiring protocol
extensions. Ultimately we will propose extensions for interoperability
but we should be able to prove the implementation without them.

Types of state

...

We have to consider several kinds of state:

  • Cluster Membership: Active cluster members (nodes) and data about them.
  • AMQP Wiring: Names and properties of queues, exchanges and bindings.
  • AMQP Content: Data in messages on queues.
  • Session: Conversation with a single client, including references.

Data must be replicated and stored such that:

  • A client knows which node(s) can be used for failover.
  • After a failover, the client can continue its session uninterruped.
  • No acknowledged messages or commands are lost.
  • No messges or commands are applied twice.

Cluster membership, wiring and session identities are low volume, and
will be replicated using virtual synchrony so the entire cluster has a
consistent picture.

Queue content is high volume so it is replicated point-to-point using
primary-backup to avoid flooding the network.

Session state is potentially high volume and only relevant to a single
client, so it is also replicated point-to-point.

How to choose the number and location of backup nodes for a given
queue or session is an open question. Note that the choice is
independent for every queue and session in principle, but in practice
they will probably be grouped in some way.

The Cluster Map: Membership and Wiring

Membership, wiring and session changes are low volume. They are
replicated to entire cluster symmetrically using a virtual synchrony
protocol such as openAIS or JGroups.

Wiring inclues

  • exchange names and properties
  • queue names, properties and bindings.

Membership data includes:

  • address of each node
  • state of health of each node
  • primary/backup for each queue/exchange
  • session names, primary/backup for each session.

Queue Content

Message content is too high volume to replicate to the entire cluster,
so each queue has a primary node and one or more backup nodes. Other
nodes can act as proxies. The client is unaware of the distinction, it
sees an identical picture regardless of what broker it connects to.

Note a single cluster node may contain a mix of primary, backup and
proxy queues.

*TODO*: Ordering issues with proxys and put-back messages (reject,
transaction rollback) or selectors.

Fragmented Shared Queues

A shared queue has reduced ordering requirements and increased
distribution requirements. Fragmenting a shared queue is a special
type of replication. The queue is broken into a set of disjoint
sub-queues each on a separate node to distribute load.

Each fragment (sub-queue) content is replicated to backups
just like a normal queue, independently of the other fragments.

The fragments collaberate to create the appearance of a single
queue. Fragments store incomging messges in the local queue, and serve
local consumers from the local queue whenever possible. When a
fragment does not have messages to satisfy its consumers it consumes
messages from other fragments in the group. Proxies to a fragmented
queue will consume from the "nearest" fragment if possible.

*TODO*: Proxies can play a more active role. Ordering guarantees, we can
provide "same producer to same consumer preserves order" since
messages from the same producer always go on the same fragment
queue. May break down in the presence of failover unless we remember
which fragment received messges from the client and proxy to the same
one on the failover replica.

Session State

Session state relates to a client-broker relationshipincludes:

  • open channels, channel attributes (qos, transactions etc.).
  • active consumers.
  • open references.
  • completed command history.
  • commands in flight.
  • open transactions
  • exclusive/private queues.

The broker a client is connected to is the primary or host for that
client's session. One session primary, one or more other brokers are session backups. The
client can fail over to one of the session backups if the primary
fails.

Note channels and consumers state can be rebuilt by the client .
TODO: is there any benefit to replicating this?

The backup brokers store:

  • content in open references.
  • requests sent but not yet responded to.
  • responses sent but not yet marked.

On resume we take the following steps:

  • client rebuilds channel/consumer state
  • client & broker re-send in-doubt messages
  • client & broker ignore duplicate in-doubt messages

These steps should be hidden in the client library. At this point
we can return control to the application where it left off.

    • Mapping of AMQP commands to replication mechanisms
      • queue.declare/bind/delete, exchange.declare/delete

Update cluster map. Local broker creates the initial queue as primary
and establishes a backup.

Private queue: backed up on the session backup.

Shared queue: local primary queue is the first primary fragment. Other
brokers that receive publishes for the queue can proxy to this
fragment or create their own local fragment (TODO: How do we decide?)
Consumes are always served from the local fragment if possible,
otherwise proxied to another fragment *(TODO: load balancing algorithms
to choose the appropriate fragment)*

      • message.transfer/basic.publish (client to broker)

Local broker evaluates the binding to determine which queue(s) receive
the message.

  • primary queues: update local queue, replicate to backup.
  • proxy queues: forward to primary
    (When the proxy is also a backup we can optimize out the replication step.)

If the message is delivered to more than one proxy queue on the same
node, we just relay the message once. Brokers must be able to
differentiate between normal message transfer and proxy/replication
transfer so that when the evaluate the binding they only apply the
message to local primary/backup queues respectively, and don't attempt
to re-forward messages.

TODO: there are a few options:

  • Use custom backup/proxy exchanges and pass an explicit list of queues to receive the message in the header table.
  • Use normal AQMP commands over a marked connectin/channel
  • Introduce new cluster commands.
      • message.transfer(broker to client), message.deliver
  • primary: replicate deliver to backup(s) deliver to client.
  • proxy: pass through to client.

Before sending a message to a client, the primary must be sure that
the session backup 'knows' about the delivery; i.e. in the event of
primary failure the backup knows about unacked messages and will be
able to handle an ack or reject for it, resend or requeue it.

If we can define a clear and deterministic algorithm for message
dispatch, and if we replicate all 'inputs' in order then that should
be sufficient.

Selectors slightly complicate the picture, as do multiple consumers
and flow control particularly for shared queues where the consumers
could be from different sessions.

In the case of an exclusive or private queue all the inputs come from
a single session. If all session requests are handled serially on both
primary and backup then dispatch should be deterministic; if separate
threads were used to process separate queues that would be lost as the
allocation of delivery tags would be dependent on the interleaving of
those threads.

One way of avoiding the need for deterministic dispatch would be for
the primary to send a message to the backup(s) to indicate an
allocation before the deliver is sent to the client. This could inform
the backup of the queue in question, the message id and the delivery
tag/request id. The big drawback is that it requires a round-trip to
the backup before each deliver and would really affect throughput.

This looks like an area that needs some specific focus. Can we
convince ourselves of a clear and deterministic dispatch algorithm,
are thereother solutions that would avoid requiring this without too
much synchronicity?

      • message.consume/basic.consume
  • proxy: forward consume. No replication, client will re-establish consumers.
  • primary: register consumer.
      • basic.ack/message.ok(from client)
  • proxy: forward
  • primary: mark message processed, replicate to backups.
      • basic.ack/message.ok(from broker)
  • proxy: forward to client
  • client: mark message processed.
      • basic.reject / message.reject

Similar to the processing of basic.ack. However here the message might
be requeued or might be moved to a dead letter queue. Ignoring the
dead letter queue in the first instance, the backup would merely
cancel the effect of the basic.allocate on receiving the basic.reject.

      • reference.open/apppend/close (client to broker)
  • proxy: replicate to session backup, forward to primary.
  • primary: process.
      • reference.open/apppend/close (broker to client) **
  • primary: send open/append/close.
  • proxy: replicate to session backup, forward to client.
      • All commands
  • proxy replicates required command history to session backup.
  • Client-Broker Protocol

Normal AMQP with the following extensions.

Initial connection:

  • Pass session name as 0-9 connection identifier or via arguments table.
  • Broker provides list of failover replicas in arguments table.

During connection:

  • Client can subscribe to a special "cluster exchange" for messages carrying updates to failover candidates.

On failure:

  • client chooses failover node randomly from most recent list.
  • cluster list my identify "preferred" failover candidates.

On re-connect:

  • 0-9 resume command identifies session.
  • Client rebuilds conversational state.
  • opens channels
  • creates consumers
  • establishes
  • replays unacknowledeged commands and continues session.

Note: the client sends conversational state data in messages to a
special system exchange. We cant simply use standard AMQP to rebuild
channel state, as we will end up with channels with a different command
numbering from the interrupted session. For transparency we also want
to distinguish reconnection from resumed "normal" operation.

At this point the session can continue.

  • Broker-Broker Protocol

Broker-broker communication uses extended AMQP over specially identified
connections and channels (identified in the connection negotiation
argument table.)

*Proxying*: acting as a proxy, a broker forwards commands from client to
primary and vice versa. The proxy is as transparent and stateless as
possible. A proxy must renumer channels and commands since a single
incoming connection may be proxied to more than one outbound
connection, so it does need to keep some state. This state is part of the
session state replicated to the session backup.

*Queue/fragment replication*: Depends on whether AMQP or GFS is used
to replicate content.

*AMQP*: For enqueue use AMQP transfer command to transfer content to
backup(s). For dequeue use AMQP get command to indicate message
removed - no data is transferfed for get over a replication channel.

TODO: this use of get is strained, it starts to look like we may need a
separate replication class of commands.

*GFS*: Queue state is updated in journal files. On failover, the backup
reconstruct queue state from the journal.

*Session replication*: The broker must replicate a command (and get
confirmation it was replicated) before responding. For async clients
this can be done in a pair of asynchronous streams, i.e. we don't have
to wait for a response to command A before we forward command B.

Session data is replicated via AMQP on special connections. Primary
forwards all outgoing requests and incoming responses to the session
backup. Backup can track the primary request/response tables and
retransmit messages.

**TODO**: 0-9 references force us to have heavy session backup, because
message data on a reference is not associated with any queue and
therefore can't be backed up in a queue backup. If references are
removed in 0-10 revisit the need for session backups, we may be able
to comress session data enough to store it in the cluster map.

  • Persistence and Recovery
    • Competing failure modes:

*Tibco*: fast when running clean but performance over time has GC
"spikes" Single journal for all queues. "holes" in log have to be
garbage collected to re-use the log. 1 slow consumer affects everyone
because it causes fragmentation of the log.

*MQ*: write to journal, write journal to DB, read from DB.
Consistent & reliable but slow.

*Street homegrown solutions*: transient MQ with home grown
persistence. Can we get more design details for these solutions?

    • Persistence overview

There are 3 reasons to persist a message:

*Durable messages*: must be stored to disk across broker shutdowns or failures.

  • stored when received.
  • read during start-up.
  • must be removed after deliver.

*Reliability*: recover after a crash.

  • stored when received.
  • read during crash recovery.
  • must be removed after delivery.

*Flow-to-disk*: to reduce memory use for full queues.

  • stored when memory gets tight.
  • read when delivered.
  • must be removed after delivery.

Durable and reliable cases are very similar: storage time is
performance-critical (blocks response to sender) but reading is not
and cleanup can be done by an async thread or process.

For flow-to-disk, when queues are full, both store and reading are
critical.

So it looks like the same solution will work for durable and reliable.

Flow-to-disk has different requirements but it would be desirable to
re-use some or all of the durable/reliable solution. In particular if
flow-to-disk is combined with durable/reliablle it would be wasteful
to write the message to disk a second time - instead it would seem
better to keep an in-memory index that allows messages to be read
quickly from the reliable/durable store.

We also need to persist *wiring* (Queues/Exchanges/Bindings), but this
is much less performance critical. The entire wiring model is held in
memory so wiring is only read at startup, and updates are low volume
and not performance-critical. A simple database should suffice.

  • Journals
    • Overview

A journal is a sequential record of actions taken (e.g. messages
enqueued, responses sent.) sufficient to reconstruct the state of the
journalled entity (e.g. queue) in the case of failure and recovery.

*TODO*: *Journal indexing, async journal (thruput vs. latency), journal
as common API for backups and disk store?*

**TODO**: *Windows for error in journalling - how to make disk update and
network ack atomic? How do other technologies handle it?*

**TODO*: *References strike again: where do they go in a journal-per-queue?

**TODO*: *Journal per broker pros/cons

    • Use of journals

For reliability and durability we will use

  • Queue journal (per queue) records enqueue/dequeues and acknowledgements.
  • Session journal (per session) records references in progress

The broker writes enqueue and dequeue records to the end of the active
journal file. When the file reaches a fixed size it starts a new one.

A cleanup agent (thread or process) removes, recycles or compacts
journal files that have no live (undelivered) messages. (References
complicate the book-keeping a little but don't alter the conceptual
model.)

Recovery or restart reconstructs the queue contents from the
enqueue/dequeue records in the journal.

Flow-to-disk can re-use the journal framework, with a simple
extension: the broker keeps an in-memory index of live messages in the
journal.

If flow-to-disk is combined with reliability then messages are
automatically journalled on arrival, so flow-to-disk can simply delete
them from memory and use the in-memory index to read them for delivery.

Without reliability flow-to-disk is similar except that messages are only
journalled if memory gets tight.

*Disk thrashing*: Why do we think skipping disk heads around between
multiple journals will be better than seeking up and down a single
journal? Are we assuming that we only need to optimize the case
where long sequences of traffic tend to be for the same queue?

*No write on fast consume*: Optimization - if we can deliver (and get
ack) faster than we write then no need to write. How does this
interact with HA?

*Async journalling*: writing to client, writing to journal, acks from
client, acks from journal are separate async streams? So if we get
client ack before the journalling stream has written the journal we
cancel the write? But what kind of ack info do we need? Need a diagram
of interactions, failure points and responses at each point. Start
simple and optimize, but dont rule out optimizations.

    • What about diskless reliability?

Is memory+network replication with no disk a viable option for
high-speed transient message flow? May be faster, but can't support
durable messages/persistent queues.
We will lose messages in total
failure or multiple failures where all backups fail, but we can
survive single failures and will run a lot faster than diskful.

  • Virtual synchrony

*TODO*: Wiring & membership via virtual synchrony

*TODO*: journaling, speed. Will file-per-q really help with disk burnout?

  • Configuration
    • Simplifying patterns
      Possible ways to configure a cluster:
  • Virtual hosts as units of replication.
  • Backup rings: all primary components in a broker use the same backup broker and vice-versa. Backups form rings.
  • Broker component rinks: all the components except sessions have the same backup broker. Session backups are chosen at random so a brokers load will be distributed rather than all falling on its backup.
  • Disk management issues?
  • Shared storage issues?
    • Dynamic cluster configuration
  • Failover: the primary use case.
  • Add node: backup, proxy, primary case?
  • Redirect clients from loaded broker (pretend failure)
  • Move queue primary from loaded broker/closer to consumers?
  • Re-start after failover.

*Issue:* unit of failover/redirect is connection/channel but "working
set" of queues and exchanges is unrelated. Use virtual host as unit
for failover/relocation? It's also a queue namespace...

If a queue moves we have to redirect its consumers, can't redirect
entire channels! Channels in the same session may move between
connections. Or rather we depend on broker to proxy?

Backups: chained backups rather than multi-backup? Ring backup?
What about split brain, elections, quorums etc.

Should new backups acquire state from primary, from disk or possibly
both? Depends on GFS/SAN vs. commodity hw?

  • Transactions
    • Local transactions

AMQP offers local and distributed transactions, however in a cluster a
local transaction could involve queues that are distributed across
several nodes.

**TODO**: This complicates the model of a proxy as a simple forwarder. You
cannot simply forward a local transaction involving queues on two
separate primary brokers, the proxy has to be aware of the transaction.

**TODO** Can we use point-to-point local transactions or do we have to
turn this into a dtx? If dtx, who co-ordinates? Is every broker
potentially a transaction co-ordinator?

**TODO**: For distributed transactions, will the primary broker and its
backups act as a single clustered resource manager for the resource
set, or will a failure of one broker abort the transaction?

    • Distributed Transactions

The prepare needs to be replicated so that if one node fails before
completion another node can honour the guarantee to be able to commit
or abort. It is also possibe that the work of a transaction is
distributed across more than one node anyway.

I think broadcasting all dtx commands over the group communication
protocol seems like the most likely way to handle this.

The session in which the commands are initiated needs to be replicated
also to allow clean resumption on failover.

  • Open Questions

Issues: double failure in backup ring: A -> B -> C. Simultaneous
failure of A and B. C doesn't have the replica data to take over for A.

Java/C++ interworking - is there a requirement? Fail over from C++ to Java?
Common persistence formats?

  • Implementation breakdown.

The following are independently useful units of work that combine to
give the full story:

*Proxy Queues*: Useful in federation. Pure-AMQP proxies for exchanges
might also be useful but are not needed for current purpose as we will
use virtual synchrony to replicate wiring.

*Fragmented queues*: Over pure AMQP (no VS) useful by itself for unreliable
high volume shared queue federation.

*Virtual Synchrony Cluster*: Multicast membership and total ordering
protocol for brokers. Not useful alone, but useful with proxies and/or
fragments for dynamic federations.

*Primary-backup replication*: Over AMQP, no persistence. Still offers some
level of reliability in a simple primary-backup pair.

*Persistence*: Useful on its own for flow-to-disk and durable messages.
Must meet the performance requirements of reliable journalling.

Footnotes:

1 Exclusive or auto-delete queues are deleted on disconnect, we'll return to this point.

2 The "session" concept is not fully defined in AMQP 0-8 or 0-9 but is under discussion. This design note will define a session that may be proposed to AMQP.

3 An individual broker by this definition is really a broker behind a single AMQP address. Such a broker might in fact be a cluster using technologies like TCP failover/load balancing. This is outside the scope of this design, which focusses on clustering at the AMQP protocol layer, where cluster members have separate AMQP addresses.

backup. On failure of the primary the client fails-over to a backup as described below.

TODO: We could allow resume on non-session-backup node, by letting it download session state from a session backup.

The primary-backup protocol must guarantee that the backup has sufficient data to resume at all times without becoming a synchronous bottleneck.

In-flight commands

Both peers must store sent commands for possible resend and received commands to detect possible duplicates in a failover.

To keep session size finite a peer can:

  • forget sent commands when we know the other peer has received them.
  • forget received commands when we know the other peer will not resend them.

An algorithm to achieve this:

self_received(r):

if r.is_response: peer_received(sentr.responds_to_id) for s in sent0..r.process_mark: peer_received(s)

peer_received(s):

sent.erase(s) # forget s but also... # Peer will never resend commands <= s.process_mark. for r in received0..s.process_mark received.erase(r)

The weakest rules for interop between peers A and B are:

  • A MAY forget a sent command when A knows B received it.
  • A MUST NOT re-send a command after B could know that A knows B received it.
  • A MUST remember received commands till A knows that B knows A received it.

Or in protocol terms:

  • A MAY forget sent command N when it receives a response to N.
  • A MUST NOT resend N after sending a response to a response to N.
  • A MUST remember received command N until it has both sent M responding to N and received a response to M.

Resuming a channel

When a channel is first opened, the broker provides a session-id. If there is a failure, the client can connect to the session backup broker and resume the channel as follows (sync code is just for illustration)

TODO does it matter if the new channel number is different from the old?

  1. Client client_resume:

    send(command=channel_resume, command_id=0, session_id=resume_id, process_mark=pre_crash_process_mark) ok = receive(command=channel_ok) self_received(ok) # Clean up to peers process mark. resend() continue_session_as_normal()

  1. Both sides resend():
    1. Resend in-flight messages. for s in sent: # Careful not to respond to a command we haven't received yet. if s.is_response: until(received.contains(s.resonds_to_id)): self_received(receive()) send(s); # Original command ids and process_mark
  1. Broker broker_received_channel_resume(r):

    session=sessionsr.session_id self_received(r) # Up to date with peers process mark. send(command=channel_ok, command_id=0, process_mark=session.process_mark) resend() continue_session_as_normal()

Replicating session state.

TODO: Need to minimize primary synchronously waiting on backup, while ensuring that the primary always knows that the backup is in a state that satisfies the clients expectations for failover. See recent email thread betwween me & gordon

Mapping of AMQP commands to replication mechanisms

queue.declare/bind/delete, exchange.declare/delete

Update cluster map. Local broker creates the initial queue as primary and establishes a backup.

Private queue: backed up on the session backup.

Shared queue: local primary queue is the first primary fragment. Other brokers that receive publishes for the queue can proxy to this fragment or create their own local fragment (TODO: How do we decide?) Consumes are always served from the local fragment if possible, otherwise proxied to another fragment (TODO: load balancing algorithms to choose the appropriate fragment)

message.transfer/basic.publish (client to broker)

Local broker evaluates the binding to determine which queue(s) receive the message.

  • primary queues: update local queue, replicate to backup.
  • proxy queues: forward to primary
    (When the proxy is also a backup we can optimize out the replication step.)

If the message is delivered to more than one proxy queue on the same node, we just relay the message once. Brokers must be able to differentiate between normal message transfer and proxy/replication transfer so that when the evaluate the binding they only apply the message to local primary/backup queues respectively, and don't attempt to re-forward messages.

TODO: there are a few options:

  • Use custom backup/proxy exchanges and pass an explicit list of queues to receive the message in the header table.
  • Use normal AQMP commands over a marked connectin/channel
  • Introduce new cluster commands.

message.transfer(broker to client), message.deliver

  • primary: replicate deliver to backup(s) deliver to client.
  • proxy: pass through to client.

Before sending a message to a client, the primary must be sure that the session backup 'knows' about the delivery; i.e. in the event of primary failure the backup knows about unacked messages and will be able to handle an ack or reject for it, resend or requeue it.

If we can define a clear and deterministic algorithm for message dispatch, and if we replicate all 'inputs' in order then that should be sufficient.

Selectors slightly complicate the picture, as do multiple consumers and flow control particularly for shared queues where the consumers could be from different sessions.

In the case of an exclusive or private queue all the inputs come from a single session. If all session requests are handled serially on both primary and backup then dispatch should be deterministic; if separate threads were used to process separate queues that would be lost as the allocation of delivery tags would be dependent on the interleaving of those threads.

One way of avoiding the need for deterministic dispatch would be for the primary to send a message to the backup(s) to indicate an allocation before the deliver is sent to the client. This could inform the backup of the queue in question, the message id and the delivery tag/request id. The big drawback is that it requires a round-trip to the backup before each deliver and would really affect throughput.

This looks like an area that needs some specific focus. Can we convince ourselves of a clear and deterministic dispatch algorithm, are thereother solutions that would avoid requiring this without too much synchronicity?

message.consume/basic.consume

  • proxy: forward consume. No replication, client will re-establish consumers.
  • primary: register consumer.

basic.ack/message.ok(from client)

  • proxy: forward
  • primary: mark message processed, replicate to backups.

basic.ack/message.ok(from broker)

  • proxy: forward to client
  • client: mark message processed.

basic.reject / message.reject

Similar to the processing of basic.ack. However here the message might be requeued or might be moved to a dead letter queue. Ignoring the dead letter queue in the first instance, the backup would merely cancel the effect of the basic.allocate on receiving the basic.reject.

reference.open/apppend/close (client to broker)

  • proxy: replicate to session backup, forward to primary.
  • primary: process.

reference.open/apppend/close (broker to client) **

  • primary: send open/append/close.
  • proxy: replicate to session backup, forward to client.

All commands

  • proxy replicates required command history to session backup.

Client-Broker Protocol

Normal AMQP with the following extensions.

Initial connection:

  • Pass session name as 0-9 connection identifier or via arguments table.
  • Broker provides list of failover replicas in arguments table.

During connection:

  • Client can subscribe to a special "cluster exchange" for messages carrying updates to failover candidates.

On failure:

  • client chooses failover node randomly from most recent list.
  • cluster list my identify "preferred" failover candidates.

On re-connect:

  • 0-9 resume command identifies session.
  • Client rebuilds conversational state.
  • opens channels
  • creates consumers
  • establishes
  • replays unacknowledeged commands and continues session.

Note: the client sends conversational state data in messages to a special system exchange. We cant simply use standard AMQP to rebuild channel state, as we will end up with channels with a different command numbering from the interrupted session. For transparency we also want to distinguish reconnection from resumed "normal" operation.

At this point the session can continue.

Broker-Broker Protocol

Broker-broker communication uses extended AMQP over specially identified connections and channels (identified in the connection negotiation argument table.)

Proxying: acting as a proxy, a broker forwards commands from client to primary and vice versa. The proxy is as transparent and stateless as possible. A proxy must renumer channels and commands since a single incoming connection may be proxied to more than one outbound connection, so it does need to keep some state. This state is part of the session state replicated to the session backup.

Queue/fragment replication: Depends on whether AMQP or GFS is used to replicate content.

AMQP: For enqueue use AMQP transfer command to transfer content to backup(s). For dequeue use AMQP get command to indicate message removed - no data is transferfed for get over a replication channel.

TODO: this use of get is strained, it starts to look like we may need a separate replication class of commands.

GFS: Queue state is updated in journal files. On failover, the backup reconstruct queue state from the journal.

Session replication: The broker must replicate a command (and get confirmation it was replicated) before responding. For async clients this can be done in a pair of asynchronous streams, i.e. we don't have to wait for a response to command A before we forward command B.

Session data is replicated via AMQP on special connections. Primary forwards all outgoing requests and incoming responses to the session backup. Backup can track the primary request/response tables and retransmit messages.

TODO: 0-9 references force us to have heavy session backup, because message data on a reference is not associated with any queue and therefore can't be backed up in a queue backup. If references are removed in 0-10 revisit the need for session backups, we may be able to comress session data enough to store it in the cluster map.

Persistence and Recovery

Competing failure modes:

Tibco: fast when running clean but performance over time has GC "spikes" Single journal for all queues. "holes" in log have to be garbage collected to re-use the log. 1 slow consumer affects everyone because it causes fragmentation of the log.

MQ: write to journal, write journal to DB, read from DB. Consistent & reliable but slow.

Street homegrown solutions: transient MQ with home grown persistence. Can we get more design details for these solutions?

Persistence overview

There are 3 reasons to persist a message:

Durable messages: must be stored to disk across broker shutdowns or failures.

  • stored when received.
  • read during start-up.
  • must be removed after deliver.

Reliability: recover after a crash.

  • stored when received.
  • read during crash recovery.
  • must be removed after delivery.

Flow-to-disk: to reduce memory use for full queues.

  • stored when memory gets tight.
  • read when delivered.
  • must be removed after delivery.

Durable and reliable cases are very similar: storage time is performance-critical (blocks response to sender) but reading is not and cleanup can be done by an async thread or process.

For flow-to-disk, when queues are full, both store and reading are critical.

So it looks like the same solution will work for durable and reliable.

Flow-to-disk has different requirements but it would be desirable to re-use some or all of the durable/reliable solution. In particular if flow-to-disk is combined with durable/reliablle it would be wasteful to write the message to disk a second time - instead it would seem better to keep an in-memory index that allows messages to be read quickly from the reliable/durable store.

We also need to persist wiring (Queues/Exchanges/Bindings), but this is much less performance critical. The entire wiring model is held in memory so wiring is only read at startup, and updates are low volume and not performance-critical. A simple database should suffice.

Journals

Overview

A journal is a sequential record of actions taken (e.g. messages enqueued, responses sent.) sufficient to reconstruct the state of the journalled entity (e.g. queue) in the case of failure and recovery.

TODO: Journal indexing, async journal (thruput vs. latency), journal as common API for backups and disk store?

TODO: Windows for error in journalling - how to make disk update and network ack atomic? How do other technologies handle it?

TODO: References strike again: where do they go in a journal-per-queue?

TODO: Journal per broker pros/cons

Use of journals

For reliability and durability we will use

  • Queue journal (per queue) records enqueue/dequeues and acknowledgements.
  • Session journal (per session) records references in progress

The broker writes enqueue and dequeue records to the end of the active journal file. When the file reaches a fixed size it starts a new one.

A cleanup agent (thread or process) removes, recycles or compacts journal files that have no live (undelivered) messages. (References complicate the book-keeping a little but don't alter the conceptual model.)

Recovery or restart reconstructs the queue contents from the enqueue/dequeue records in the journal.

Flow-to-disk can re-use the journal framework, with a simple extension: the broker keeps an in-memory index of live messages in the journal.

If flow-to-disk is combined with reliability then messages are automatically journalled on arrival, so flow-to-disk can simply delete them from memory and use the in-memory index to read them for delivery.

Without reliability flow-to-disk is similar except that messages are only journalled if memory gets tight.

Disk thrashing: Why do we think skipping disk heads around between multiple journals will be better than seeking up and down a single journal? Are we assuming that we only need to optimize the case where long sequences of traffic tend to be for the same queue?

No write on fast consume: Optimization - if we can deliver (and get ack) faster than we write then no need to write. How does this interact with HA?

Async journalling: writing to client, writing to journal, acks from client, acks from journal are separate async streams? So if we get client ack before the journalling stream has written the journal we cancel the write? But what kind of ack info do we need? Need a diagram of interactions, failure points and responses at each point. Start simple and optimize, but dont rule out optimizations.

What about diskless reliability?

Is memory+network replication with no disk a viable option for high-speed transient message flow? May be faster, but can't support durable messages/persistent queues. We will lose messages in total failure or multiple failures where all backups fail, but we can survive single failures and will run a lot faster than diskful.

Virtual synchrony

TODO: Wiring & membership via virtual synchrony

TODO: journaling, speed. Will file-per-q really help with disk burnout?

Configuration

Simplifying patterns Possible ways to configure a cluster:

  • Virtual hosts as units of replication.
  • Backup rings: all primary components in a broker use the same backup broker and vice-versa. Backups form rings.
  • Broker component rinks: all the components except sessions have the same backup broker. Session backups are chosen at random so a brokers load will be distributed rather than all falling on its backup.
  • Disk management issues?
  • Shared storage issues?

Dynamic cluster configuration

  • Failover: the primary use case.
  • Add node: backup, proxy, primary case?
  • Redirect clients from loaded broker (pretend failure)
  • Move queue primary from loaded broker/closer to consumers?
  • Re-start after failover.

Issue: unit of failover/redirect is connection/channel but "working set" of queues and exchanges is unrelated. Use virtual host as unit for failover/relocation? It's also a queue namespace...

If a queue moves we have to redirect its consumers, can't redirect entire channels! Channels in the same session may move between connections. Or rather we depend on broker to proxy?

Backups: chained backups rather than multi-backup? Ring backup? What about split brain, elections, quorums etc.

Should new backups acquire state from primary, from disk or possibly both? Depends on GFS/SAN vs. commodity hw?

Transactions

Local transactions

AMQP offers local and distributed transactions, however in a cluster a local transaction could involve queues that are distributed across several nodes.

TODO: This complicates the model of a proxy as a simple forwarder. You cannot simply forward a local transaction involving queues on two separate primary brokers, the proxy has to be aware of the transaction.

TODO Can we use point-to-point local transactions or do we have to turn this into a dtx? If dtx, who co-ordinates? Is every broker potentially a transaction co-ordinator?

TODO: For distributed transactions, will the primary broker and its backups act as a single clustered resource manager for the resource set, or will a failure of one broker abort the transaction?

Distributed Transactions

The prepare needs to be replicated so that if one node fails before completion another node can honour the guarantee to be able to commit or abort. It is also possibe that the work of a transaction is distributed across more than one node anyway.

I think broadcasting all dtx commands over the group communication protocol seems like the most likely way to handle this.

The session in which the commands are initiated needs to be replicated also to allow clean resumption on failover.

Open Questions

Issues: double failure in backup ring: A -> B -> C. Simultaneous failure of A and B. C doesn't have the replica data to take over for A.

Java/C++ interworking - is there a requirement? Fail over from C++ to Java? Common persistence formats?

Implementation breakdown.

The following are independently useful units of work that combine to give the full story:

Proxy Queues: Useful in federation. Pure-AMQP proxies for exchanges might also be useful but are not needed for current purpose as we will use virtual synchrony to replicate wiring.

Fragmented queues: Over pure AMQP (no VS) useful by itself for unreliable high volume shared queue federation.

Virtual Synchrony Cluster: Multicast membership and total ordering protocol for brokers. Not useful alone, but useful with proxies and/or fragments for dynamic federations.

Primary-backup replication: Over AMQP, no persistence. Still offers some level of reliability in a simple primary-backup pair.

Persistence: Useful on its own for flow-to-disk and durable messages. Must meet the performance requirements of reliable journalling.

...

Footnotes Display
4 They may not be equivalent on other grounds, e.g. network distance from client, load etc.