You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

NB: this is similar to [Clustering Design Notes] in many ways. The principal differences are:

  • Active state propagation between primary/backup only. Propagation to other proxies is pulled on demand.
  • Open AIS for more efficient cluster state management.
  • Transaction logging for fast persistent backup in case of more widespread failure.

The two designs should be reviewed & merged.

  • Clustering design
    • Summary

Note: in this document the term "resource" means "Queue or Exchange".

A broker cluster provides the illusion of a single broker: any cluster member can serve clients for any clustered resource.

For each clustered resource a cluster broker has exactly one of these roles:

  • Primary: primary owner of the Queue/Exchange.
  • Backup: becomes primary if primary fails.
  • Proxy: forwards to primary.

This application is partitioned by assigning groups of queues/exchanges to primary brokers. The same broker can be primary for one exchange, backup for another, proxy for another etc.

Primary/backup failover provides software/hardware fault tolerance.

Proxies provide load distribution and act as connection concentrators.

TODO: There are many possible configurations for these roles, need to identify some basic use cases and show corresponding configuration.

Wherever it makes sense we will use AMQP itself to communicate within the cluster, using the following extension points to carry cluster data over standard AMQP:

  • Field table parameters to various AMQP methods (declare() arguments etc.)
  • Field table in message headers.
  • System exchanges and queues.
  • Alternate implementations of Queues, Exchanges, Bindings.
    • Sessions

With failover a single client-broker "session" may involve multiple connects/disconnects and multiple broker and/or client processes.

The client generates a UUID to identify each new session.

Extension points in the existing protocol are used to create a session:


  1. AMQP methods to start a session.
  1. Client requests a timeout
    Unknown macro: { qpid.session.start}
  1. Broker may impose a lower the timeout.
    Unknown macro: {qpid.session.timeout}

The broker keeps the session for the timeout period if the client connection is unexpectedly closed, or if the client calls Connection.close() with reply-code=KEEP_SESSION (new code). Any other type of connection closure terminates the session.

If the broker is configured for HA then sessions will survive failover. If not, sessions will survive temporary client disconnect.


  1. Client resuming a disconnected session:
    Unknown macro: {qpid.session.resume}
  1. Broker returns the status of the session.
    Unknown macro: {qpid.session.resume}

TODO define <session status>. Gives the status of the session as known to the broker - things like resumed channel ids, outstanding/unacked requests etc.

If the session has timed out or does not exist broker closes the connection with an exception.

    • Client Failover
      • Finding Alternate Brokers

The known-hosts parameter provides the initial list of alternate brokers. Note each "host" can be "host:port".

Client can bind a temporary queue to predeclared exchange "amq.qpid" with routing-key="cluster". The broker will publish a message whenever the alternate broker list changes. The message has empty body and header:

qpid.known.hosts = longstr

Unknown macro: { shortstr host, shortstr host ... }


      • Choosing a Broker

Client chooses randomly among alternate brokers.

Sophisticated strategies for choosing brokers e.g. nearest-in-network, least-loaded etc may be considered in later versions.

Client resumes the session as described under "Sessions"

    • Clustered Resources

Each resource on a clustered broker falls into one of these categories:

  • Local: not visible to other cluster members.
  • Proxied: visible via all cluster members but not fault-tolerant.
  • Fault tolerant, transient: Mirrored to backup, not logged to disk.
  • Fault tolerant, durable: Mirrored and logged to disk.

Transient resources and transient messages survive failover to an active backup, but not failure of both primary and backup.

Durable fault-tolerant resources and persistent messages on durable, fault tolerant queues survive any failover including failure and restart of the entire cluster.

Local resources do not survive failure of their broker.

Proxied resources: when a broker joins a cluster it will create proxies for all clustered resources. Cluster brolkers automatically keep their proxy set up to date as brokers join & leave the cluster and as resource are created/destroyed on their primary broker.

      • Creating Clustered/Proxied Resources

A qpid client can explicitly request creation of cluster or proxy
resource by adding to the "arguments" parameter of declare methods.


  1. Create a local proxy to a resource on <host>.
  2. Fail if the resource does not exist at host or host is invalid.
    Unknown macro: {qpid.cluster.proxy}
  1. Create a clustered resource on remote primary <host> and a local proxy.
  2. If host="" or the current broker then create a clustered resource
  3. with the local broker as primary.
    declare(arguments={qpid.cluster.primary = <host>

Clustered resources can be declared on any clustered broker from any other clustered broker. Declarations are forwarded to the primary.

For compatiblility with non-Qpid clients a broker can be configured (via AMQP?) with a list of rules mapping queue/exchange names to names to primary hosts. If a normal declare() name matches such a rule the broker will either create a proxy to the resource or attempt to create the remote cluster resource based on rule configuration. TODO details.

    • Proxies

Proxies use only AMQP to communicate. They can be used indepedently of clustering as federated connection concentrators. They use standard AMQP on both sides so they can interoperate with non-Qpid clients and brokers.

Pure-proxy brokers can be used as connection concentrators and to cross network boundaries - e.g. TCP to IGMP forwarding, running in DMZ etc.

The basic idea is to create proxy exchange/queue instances with special proxy implementations to give the illusion of a common queue/exchange setup on each broker. The behavior of these proxy objects is as follows:

TODO: namespace issue, see Questions section below.

      • Publish to proxy exchange

Local proxy exchange acts as client to remote exchange. Any message published to proxy exchange is forwarded to remote exchange.

Proxy exchanges have the same name as their remote counterpart so they they can be used as if they were the remote counterpart.

      • Consume from proxy queue

Proxy queue consumes, gets and acknowledges messages from remote queue on behalf of its clients. It must not acknowledge any message until it has received acknolwedgement from its client. It should use flow control to avoid consuming messages unless it has a client ready to consume them.

TODO: draw out the scenarios for avoiding "stuck message syndrome" where a proxy queue consumes a message but then has nobody to pass it on to.

      • Bind local queue to proxy exchange

Proxy exchange creates private queue on remote broker and binds to remote exchange with identical binding arguments. Proxy exchange consumes from private remote queue and enqueues messages on the local queue.

      • Bind proxy queue to local exchange

The proxy queue acts as client to the remote broker. All messages delivered to the proxy queue are sent to the remote queue via the remote default exchange and default (queue name) binding.

      • Bind proxy queue to proxy exchange

Given: remote Queue Q on remote broker Bq, remote Exchange X on remote broker Bx. Local broker is B.

If Bq=Bx then simply bind on the remote broker.

If Bq or Bx is a Qpid broker then create a proxy on the Qpid broker for the resource on the non-qpid broker and bind it to the resource on the Qpid broker.

If neither Bq nor Bx is a qpid broker: bind the proxy queue to the proxy exchange (combine rules of two preceeding use cases.) This means traffic from Bx will be routed thru B and forwared to Bq. May or may not be a useful thing to do.

      • Resource lifecycle
        Explicit resource delete commands are forwarded to the primary broker.

In a Qpid cluster, the cluster is notified via the cluster map (see below) so resources can be reclaimed on all proxy brokers.

If Qpid brokers act as proxies for non-Qpid brokers outside a cluster it may not be possible to immediately notify all the proxies. Therefore brokers must also clean up proxy resources if a NOT_FOUND exception indicates that a resource no longer exists, and should propagate the exception back to the client.

This could still leave dead proxies in idle brokers. Proxy brokers may periodically check the existence of resources they proxy for (using declare) and clean up if the proxied resource no longer exists.

Auto-delete queues: proxies for auto-delete queues should mimic their clients, in particular they must not keep the queue open for consume if they have no clients consuming. On the other hand they must not allow the queue to be deleted if they still have consuming clients.

      • Connection management
        TODO: details:
  • Clients must be unaware of disconnect/reconnect by proxies.
  • Proxies shouldn't hold unused connections open for no reason.
  • Proxies shouldn't create/teardown connections too frequently.
  • Proxies should share connections where possible.
    • Cluster Communication
      • Broker sessions.

Sessions are considered resources by the brokers - each session has primary and backup brokers . The primary for a session is the broker where the session is created.

Unlike queues and exchanges, sessions cannot be proxied. In the event of failure sessions are tranferred to another broker. After a failure a client can re-establish the session on any cluster broker.

A session is active from the time a client connects up to the failure of the connection, the broker or the client. If the broker sees a connection failure, it marks the session disconnnected.

If a client attempts to resume the session somewhere in the cluster, the session is transferred to that broker and become active again. That broker becomes the primary broker for the session.

If a broker fails, all its sessions are tranferred to the backup and marked disconnected. As above a client can resume a session at any broker, the session will be transferred and activated.

Active sessions cannot be transferred - a client can only be connected to a session via one broker at a time.

      • Resource sets

A cluster broker can take different roles with respect to different resources (queues, exchanges, sessions). A "resource set" is a set of resources owned by the same broker. The resource set is the unit of transfer, for example if a primary broker fails its entire resource set is taken over by the backup.

TODO: Failover merges the primary's resource set with the backup. Do we also need to allow resource sets to be split so a primary broker can offload part of its resource set on another broker?

      • The cluster map.

The cluster map lists

  • all brokers that are members of the cluster.
  • all cluster resources with the primary and backup broker for each.

OpenAIS will be used to give all brokers a consistent view of the cluster map:

  • membership events (join, leave, failed.)
  • cluster map updates
  • detecting failed brokers.
  • resource changes (create, delete, transfer)

All other traffic between brokers will use AMQP. The majority of traffic can be done using the proxy techniques described above. Other communication is via system exchange and queues.

TODO: is there a need for any other protocols?

      • Choosing primaries and backups


  • pre-configured static resources with fixed primary/backup.
  • client/admin specifies primary (and backup?) on resource creation.
  • dynamic: cluster automatically (re)assigns roles as brokers join.

Probably need a combination: a way to set up an initial static assignment, a way for admins to intervene and adjust assignments and a way for the cluster to respond dynamically to changes. At a minimum it must be able to respond to a node failure.

For example: brokers could be arranged in a ring with each broker being the backup fro the preceeding one. To add a new broker you choose a point in the ring to insert it and the adjacent brokers adjust their responsibilities accordingly. Implies we can transfer "backupship" as well as ownership.

      • Broker Failover

Transient resources and transient messages survive failover to an active backup, but not failure of both primary and backup.

Durable fault-tolerant resources and persistent messages on durable, fault tolerant queues survive any failover including failure and restart of the entire cluster.

In active/active clustering the primary broker forwards all commands and messages (persistent and transient) to the backup. The backup keeps an in-memory representation of the state of the primary. It still acts as a proxy for resourced owned by the primary, it does not act on the primaries behalf while it is alive.

Supposed A is backed up by B, B is backed up by C. If broker A fails its primary and backup resource sets are transferred to broker B. B becomes primary for those resources formerly owned by A and becomes backup for those resources formerly backed up by A. C becomes backup for those resources formerly owned by A.

A clustered broker also writes a transaction log of commands related to durable resources and persistent messages. Thus if both primary and backup fail, or if the primary fails without any backup, another broker can be elected to take over by recovering the state of the resources from the log files.

TODO: possible optimization - forward commands without messages to backup store messages persistently. Backup keeps model of state but only loads messages if actually required by failover. Tradeoff normal-running performance/network load with recovery time.

TODO: Elections - detail scenarios and strategy, research what openAIS has to offer.

TODO: Transfer of state to the third broker C in the failover scenario. I.e. how does C become the backup for resources transferred from A to B? How does it catch up with the state of those resources?

    • Logs and persistence.

Use GFS to share storage between brokers. Two types of storage to consider:

*Transaction logs: Optimized for *writing. A flat file with new entries always appended at the end. A single broker writes to each log file (minimise lock contention.) Not optimal for reading - need to read the entire sequence of log events to reconstruct the state of a broker. E.g. may include reading commands to create resources that subsequently get deleted. When the log reaches a certain size, start a new log file.

*State snapshot*: Image of the state of a set of resources at some point in time. More efficient for reading, more compact but slower to write because data structures must be reorganized.

A *log consolidator* reads from transaction logs and constructs a state snashot, deleting transaction logs as they are incorporated into the snapshot. So to do a full recovery you first load the snapshot, then apply the logs.

    • Questions

Lots more detail needed in failover. There are many ways you could configure these components, need to identify what makes sense, how much flexibility to offer and what simplifying assumptions we can make.

Need some profiling to see how various options perform in real life.

Split brain and quorums: How to achieve agreement that a broker is failed, e.g. using quorum device. Does open AIS solve this for us or is there more to it?

Elections & state tranfer: much more detail needed.

Naming problem for proxying and clustering. - Which names are local, which must be consistent across the cluster? - How/when to deal with clashes of local & remote names? - Use explicitly separate namespaces for proxy/cluster resources? - Use virtual hosts? All cluster/proxy resources live in a special virtual host? Relationship between FT storage and durable storage?

Proxies forward requests to primary only or also to hot backup?

UUID algorithm?

  • No labels