Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 4.0

Reliable Broker Cluster

This document describes cluster design and implementation as of 19 June 2009.

Overview

A Reliable Broker Clusteror just cluster is a group of brokers collaborating to present

NB: this is similar to Old Clustering Design Note in many ways. The principal differences are:

  • Active state propagation between primary/backup only. Propagation to other proxies is pulled on demand.
  • Use Open AIS protocol for more efficient cluster state management.
  • Transaction logging for fast persistent backup in case of more widespread failure.

The two designs should be reviewed & merged.

Clustering design

Summary

Note: in this document the term "resource" means "Queue or Exchange".

A broker cluster provides the illusion of a single broker : any cluster member can serve clients for any clustered resource.

For each clustered resource a cluster broker has exactly one of these roles:

  • Primary: primary owner of the Queue/Exchange.
  • Backup: becomes primary if primary fails. <- comments - I would expect that you would want to be able to configure the number of backups used in a cluster per resource 1,2,3. Don't know if there is any benifit beyond 3) ->
  • Proxy: forwards to primary. <- assume by default, everyone that is not primary is a proxy in a cluster ->

This application is partitioned by assigning groups of queues/exchanges to primary brokers. The same broker can be primary for one exchange, backup for another, proxy for another etc.

Primary/backup failover provides software/hardware fault tolerance.

Proxies provide load distribution and act as connection concentrators.

TODO: There are many possible configurations for these roles, need to identify some basic use cases and show corresponding configuration.

Wherever it makes sense we will use AMQP itself to communicate within the cluster, using the following extension points to carry cluster data over standard AMQP: <- comment - thus AMQP for 1 to 1 and AIS for 1- n where n >=3 ->

  • Field table parameters to various AMQP methods (declare() arguments etc.)
  • Field table in message headers.
  • System exchanges and queues.
  • Alternate implementations of Queues, Exchanges, Bindings.

Sessions

With failover a single client-broker "session" may involve multiple connects/disconnects and multiple broker and/or client processes.

The client generates a UUID to identify each new session.

Extension points in the existing protocol are used to create a session:

Code Block

# AMQP methods to start a session.

# Client requests a timeout
client.start(server-properties={
  qpid.session.start:<uuid>
  qpid.session.timeout:<in seconds> })

# Broker may impose a lower the timeout.
server.start-ok(client-properties={qpid.session.timeout:<seconds>]})

The broker keeps the session for the timeout period if the client connection is unexpectedly closed, or if the client calls Connection.close() with reply-code=KEEP_SESSION (new code). Any other type of connection closure terminates the session.

If the broker is configured for HA then sessions will survive failover. If not, sessions will survive temporary client disconnect.

Code Block

# Client resuming a disconnected session:
client.start(server-properties={qpid.session.resume:<uuid>})

# Broker returns the status of the session.
server.start-ok(client-properties={qpid.session.resume:<session status>})

TODO define <session status>. Gives the status of the session as known to the broker - things like resumed channel ids, outstanding/unacked requests etc.

If the session has timed out or does not exist broker closes the connection with an exception.

Client Failover

Finding Alternate Brokers

The Connection.open-ok known-hosts parameter provides the initial list of alternate brokers. Note each "host" can be "host:port".

Client can bind a temporary queue to predeclared exchange "amq.qpid" with routing-key="cluster". The broker will publish a message whenever the alternate broker list changes. The message has empty body and header:

Code Block

  qpid.known.hosts = longstr{ shortstr host, shortstr host ... };

Choosing a Broker

Client chooses randomly among alternate brokers.

Sophisticated strategies for choosing brokers e.g. nearest-in-network, least-loaded etc may be considered in later versions.

Client resumes the session as described under "Sessions"

Clustered Resources

Each resource on a clustered broker falls into one of these categories:

  • Local: not visible to other cluster members.
  • Proxied: visible via all cluster members but not fault-tolerant.
  • Fault tolerant, transient: Mirrored to backup, not logged to disk.
  • Fault tolerant, durable: Mirrored and logged to disk.

Transient resources and transient messages survive failover to an active backup, but not failure of both primary and backup.

Durable fault-tolerant resources and persistent messages on durable, fault tolerant queues survive any failover including failure and restart of the entire cluster.

Local resources do not survive failure of their broker.

Proxied resources: when a broker joins a cluster it will create proxies for all clustered resources. Cluster brolkers automatically keep their proxy set up to date as brokers join & leave the cluster and as resource are created/destroyed on their primary broker.

Creating Clustered/Proxied Resources

A qpid client can explicitly request creation of cluster or proxy
resource by adding to the "arguments" parameter of declare methods.

Code Block

# Create a local proxy to a resource on <host>.
# Fail if the resource does not exist at host or host is invalid.
declare(arguments={qpid.cluster.proxy:<host>})

# Create a clustered resource on remote primary <host> and a local proxy.
# If host="" or the current broker then create a clustered resource
# with the local broker as primary.
declare(arguments={qpid.cluster.primary = <host>

Clustered resources can be declared on any clustered broker from any other clustered broker. Declarations are forwarded to the primary.

For compatiblility with non-Qpid clients a broker can be configured (via AMQP?) with a list of rules mapping queue/exchange names to names to primary hosts. If a normal declare() name matches such a rule the broker will either create a proxy to the resource or attempt to create the remote cluster resource based on rule configuration. TODO details.

Proxies

Proxies use only AMQP to communicate. They can be used indepedently of clustering as federated connection concentrators. They use standard AMQP on both sides so they can interoperate with non-Qpid clients and brokers.

Pure-proxy brokers can be used as connection concentrators and to cross network boundaries - e.g. TCP to IGMP forwarding, running in DMZ etc.

The basic idea is to create proxy exchange/queue instances with special proxy implementations to give the illusion of a common queue/exchange setup on each broker. The behavior of these proxy objects is as follows:

TODO: namespace issue, see Questions section below.

Publish to proxy exchange

Local proxy exchange acts as client to remote exchange. Any message published to proxy exchange is forwarded to remote exchange.

Proxy exchanges have the same name as their remote counterpart so they they can be used as if they were the remote counterpart.

Comment - To maintain QoS I assume the ansync reply as defined by the transport SIG would not be issued from the proxy to the publisher until successfully enqueued by the Primary.

Consume from proxy queue

Proxy queue consumes, gets and acknowledges messages from remote queue on behalf of its clients. It must not acknowledge any message until it has received acknolwedgement from its client. It should use flow control to avoid consuming messages unless it has a client ready to consume them.

TODO: draw out the scenarios for avoiding "stuck message syndrome" where a proxy queue consumes a message but then has nobody to pass it on to.

Comment - To maintain QoS I assume the dequeue from Primary would not be issued by the proxy until successfully receiveing the reply sequence from the consumer.

Bind local queue to proxy exchange

Proxy exchange creates private queue on remote broker and binds to remote exchange with identical binding arguments. Proxy exchange consumes from private remote queue and enqueues messages on the local queue.

Bind proxy queue to local exchange

The proxy queue acts as client to the remote broker. All messages delivered to the proxy queue are sent to the remote queue via the remote default exchange and default (queue name) binding.

Bind proxy queue to proxy exchange

Given: remote Queue Q on remote broker Bq, remote Exchange X on remote broker Bx. Local broker is B.

If Bq=Bx then simply bind on the remote broker.

If Bq or Bx is a Qpid broker then create a proxy on the Qpid broker for the resource on the non-qpid broker and bind it to the resource on the Qpid broker.

If neither Bq nor Bx is a qpid broker: bind the proxy queue to the proxy exchange (combine rules of two preceeding use cases.) This means traffic from Bx will be routed thru B and forwared to Bq. May or may not be a useful thing to do.

Resource lifecycle

Explicit resource delete commands are forwarded to the primary broker.

In a Qpid cluster, the cluster is notified via the cluster map (see below) so resources can be reclaimed on all proxy brokers.

If Qpid brokers act as proxies for non-Qpid brokers outside a cluster it may not be possible to immediately notify all the proxies. Therefore brokers must also clean up proxy resources if a NOT_FOUND exception indicates that a resource no longer exists, and should propagate the exception back to the client.

This could still leave dead proxies in idle brokers. Proxy brokers may periodically check the existence of resources they proxy for (using declare) and clean up if the proxied resource no longer exists.

Auto-delete queues: proxies for auto-delete queues should mimic their clients, in particular they must not keep the queue open for consume if they have no clients consuming. On the other hand they must not allow the queue to be deleted if they still have consuming clients.

Connection management

TODO: details:

  • Clients must be unaware of disconnect/reconnect by proxies.
  • Proxies shouldn't hold unused connections open for no reason.
  • Proxies shouldn't create/teardown connections too frequently.
  • Proxies should share connections where possible.

Cluster Communication

Broker sessions.

Sessions are considered resources by the brokers - each session has primary and backup brokers . The primary for a session is the broker where the session is created.

Unlike queues and exchanges, sessions cannot be proxied. In the event of failure sessions are tranferred to another broker. After a failure a client can re-establish the session on any cluster broker.

<- comment – how does the client know which node in the cluster to reconnect to, and how is the list of nodes in the cluster published to the client-- assume it use the system exchange ->

A session is active from the time a client connects up to the failure of the connection, the broker or the client. If the broker sees a connection failure, it marks the session disconnnected.

If a client attempts to resume the session somewhere in the cluster, the session is transferred to that broker and become active again. That broker becomes the primary broker for the session.

If a broker fails, all its sessions are tranferred to the backup and marked disconnected. As above a client can resume a session at any broker, the session will be transferred and activated.

Active sessions cannot be transferred - a client can only be connected to a session via one broker at a time.

Resource sets

A cluster broker can take different roles with respect to different resources (queues, exchanges, sessions). A "resource set" is a set of resources owned by the same broker. The resource set is the unit of transfer, for example if a primary broker fails its entire resource set is taken over by the backup.

TODO: Failover merges the primary's resource set with the backup. Do we also need to allow resource sets to be split so a primary broker can offload part of its resource set on another broker? - I would say yes, also on failure another backup need to be identified as one node got lost. also all back-ups on the node that was primary also need to be replicated to another node.

The cluster map.

The cluster map lists

  • all brokers that are members of the cluster.
  • all cluster resources with the primary and backup broker for each.

OpenAIS will be used to give all brokers a consistent view of the cluster map:

  • membership events (join, leave, failed.)
  • cluster map updates
  • detecting failed brokers.
  • resource changes (create, delete, transfer)

All other traffic between brokers will use AMQP. The majority of traffic can be done using the proxy techniques described above. Other communication is via system exchange and queues.

comment — if a high number of backups is configured it might be more effient to run AMQP over AIS for this distribution, however this can be seen as an optimization and done later.

TODO: is there a need for any other protocols?

Choosing primaries and backups

Options:

  • pre-configured static resources with fixed primary/backup.
  • client/admin specifies primary (and backup?) on resource creation.
  • dynamic: cluster automatically (re)assigns roles as brokers join.

Probably need a combination: a way to set up an initial static assignment, a way for admins to intervene and adjust assignments and a way for the cluster to respond dynamically to changes. At a minimum it must be able to respond to a node failure.

For example: brokers could be arranged in a ring with each broker being the backup fro the preceeding one. To add a new broker you choose a point in the ring to insert it and the adjacent brokers adjust their responsibilities accordingly. Implies we can transfer "backupship" as well as ownership.

comment – believe in any scheme we need to be able to transfer "backupship" as well as "ownership" as when a node in the cluster fails, both may be present on that node.

Broker Failover

Transient resources and transient messages survive failover to an active backup, but not failure of both primary and backup.

Durable fault-tolerant resources and persistent messages on durable, fault tolerant queues survive any failover including failure and restart of the entire cluster.

In active/active clustering the primary broker forwards all commands and messages (persistent and transient) to the backup. The backup keeps an in-memory representation of the state of the primary. It still acts as a proxy for resourced owned by the primary, it does not act on the primaries behalf while it is alive.

Supposed A is backed up by B, B is backed up by C. If broker A fails its primary and backup resource sets are transferred to broker B. B becomes primary for those resources formerly owned by A and becomes backup for those resources formerly backed up by A. C becomes backup for those resources formerly owned by A.

A clustered broker also writes a transaction log of commands related to durable resources and persistent messages. Thus if both primary and backup fail, or if the primary fails without any backup, another broker can be elected to take over by recovering the state of the resources from the log files.

TODO: possible optimization - forward commands without messages to backup store messages persistently. Backup keeps model of state but only loads messages if actually required by failover. Tradeoff normal-running performance/network load with recovery time.

TODO: Elections - detail scenarios and strategy, research what openAIS has to offer.

TODO: Transfer of state to the third broker C in the failover scenario. I.e. how does C become the backup for resources transferred from A to B? How does it catch up with the state of those resources?

Logs and persistence.

Use GFS or shared storage offered by the platfrom to share storage between brokers. Two types of storage to consider:

Transaction logs: Optimized for writing. A flat file with new entries always appended at the end. A single broker writes to each log file (minimise lock contention.) Not optimal for reading - need to read the entire sequence of log events to reconstruct the state of a broker. E.g. may include reading commands to create resources that subsequently get deleted. When the log reaches a certain size, start a new log file.

State snapshot: Image of the state of a set of resources at some point in time. More efficient for reading, more compact but slower to write because data structures must be reorganized.

A log consolidator reads from transaction logs and constructs a state snashot, deleting transaction logs as they are incorporated into the snapshot. So to do a full recovery you first load the snapshot, then apply the logs.

Questions

Lots more detail needed in failover. There are many ways you could configure these components, need to identify what makes sense, how much flexibility to offer and what simplifying assumptions we can make.

Need some profiling to see how various options perform in real life.

Split brain and quorums: How to achieve agreement that a broker is failed, e.g. using quorum device. Does open AIS solve this for us or is there more to it?

Elections & state tranfer: much more detail needed.

Naming problem for proxying and clustering. - Which names are local, which must be consistent across the cluster? - How/when to deal with clashes of local & remote names? - Use explicitly separate namespaces for proxy/cluster resources? - Use virtual hosts? All cluster/proxy resources live in a special virtual host? Relationship between FT storage and durable storage?

Proxies forward requests to primary only or also to hot backup?

with multiple addresses. The cluster is active-active, that is to say each member broker maintains the full state of the clustered broker. If any member fails, clients can fail-over to any other member.

New members can be added to a cluster while it is running. An established member volunteers to provide a state update to the new member. Both updater and updatee queue up cluster activity during the update and process it when the update is complete.

The cluster uses the CPG (Closed Process Group) protocol to replicate state. CPG was part of Open AIS package,  it is now part of the  corosync package. To avoid confusion with AMQP messages we will refer to CPG multicast messages as events.

CPG is a virtual synchrony protocol. Members multicast events to the group and CPG ensures that each member receives all the events in the same sequence. Since all members get an identical sequence of events, they can all update their state consistently. To achieve consistency, events must be processed in the order that CPG presents them. In particular members wait for their own events to be re-delivered by CPG before acting on them.

Implementation Approach

The cluster implementation is highly decoupled from the broker. There's no cluster-specific code in the general broker, just a few hooks that the cluster uses to modify broker behavior.

The basic idea is that the cluster treats the broker as a black box and assumes that provided it is fed identical input, it will produce identical results. The cluster::Connection class intercepts data arriving for broker Connections. and sends that data as a CPG event. As data events are delivered by CPG, they are fed to the original broker::Connection objects. Thus each member sees all the data arriving at all the members in the same sequence, so we get the same set of declares, enqueues, dequeues etc. happening on each member.

This approach replicates all broker state: sessions, connections, consumers, wiring etc.  Each broker can have both direct connections and shadow connections. A shadow connection represents a connection on another broker in the cluster. Members use shadow connections to simulate the actions of other brokers, so that all members arrive at the same state. Output for shadow connections is just discarded, brokers only send data to their directly-connected clients.

This approach assumes that the behavior of the broker is determinisitc, that it is completely determined by the input data fed to the broker. There are a number of cases where this does not hold and the cluster has to take steps to ensure consistency:

  • Allocating messages: the stand-alone broker allocates messages based on the writability of client connections.
  • Client connection disconnects.
  • Timers: any action triggered by a timer may happen at an unpredictable point with respect to CPG events.

Allocating messages

The cluster allocates messages to consumers using CPG events rather than writability of client connections. A cluster connection that has potentially got data to write sends a do-output event to itself, allowing it to dequeue N messages. The messages are not actually dequeued until the do-output event is re-delivered in sequence with other events. The value of N is dynamically estimated in an attempt to match it to the rate of writing messages to directly connected clients. All the other members have a shadow connection which allows them to de-queue the same set of messages as the directly connected member.

Client disconnects

When a client disconnects, the directly-connected broker sends a deliver-close event via CPG. It does not actually destroy the connection till that message is re-delivered. This ensures that the direct connection and all the shadows are destroyed at the same point in the event sequence.

 Actions initiated by a timer

The cluster needs to do some extra work at any points where the broker takes action based on a timer (e.g. message expiry, management, producer flow control)  See the source code for details of how each is handled.

Error Handling

There are two types of recoverable error

  • Predictable errors occur in the same way on all brokers as a predictable consequence of cluster events. For example binding a queue to a non-existent exchange.
  • Unpredictable errors may not occur on all brokers. For example running out of journal space to store a message, or an IO error from the journal.

Unpredictable errors must be handled in such a way that the cluster does not become inconsistent. In a situation where one broker experiences an unpredictable error and the others do not, we want the broker in error to shut down and leave the cluster so its clients can fail over to healthy brokers.

When an error occurs on a cluster member it sends an error-check event to the cluster and stalls processing. If it receives a matching error-check from all other cluster members, it continues. If the error did not occur on some members, those members send an error-check with "no error" status. In this case members that did experience an error shut themselves down as they can no longer consistently update their state. The member that did not have the error continue, clients can fail over to them.

Transactions

Transactions are conversational state, allowing a session to collect changes for the shared state and then apply them all at once or not at all.

For TX transactions each broker creates an identical transaction, they all succeed or fail identically since they're all being fed identical input (see Error Handling above for what happens if a broker doesn't reach the same conclusion.)

DTX transactions are not yet supported by the cluster.

Persistence and Asynchronous Journaling

Each cluster member has an independent store, each recording identical state.

A cluster can be configured so that if the cluster is reduced to a single member  (the "last man standing") that member can have transient data queues persisted.

Recovery: after a total cluster shutdown, the state of the new cluster is determined by the store of the first broker started. The second and subsequent brokers will get their state from the cluster, not the store.

At time of writing there is a bug that requires the stores of all but the first broker to be deleted manually before starting the cluste

Limitations of current design

There are several limitations of the current design.

Concurrency: all CPG events are serialized into a single stream and handled by a single thread. This means clustered brokers have limited ability to make use of multiple CPUs. Some of this work is pipelined, so there is some parallelism, but it is limited.

Maintainability:  decoupling the cluster code from the broker and assuming the broker behaves deterministically makes it very easy for developers working on the stand-alone broker to unintentionally break the cluster, for example by adding a feature that depends on timers. This has been the case in particular for management, since the initial cluster code assumed only the queue & exchange state needed to be replicated, whereas in fact all the management state must also be replicated and periodic management actions must be co-ordinated.

Non-replicated state: The current design replicates all state. In some cases however, queues are intended only for directly connected clients, for example management queues, the failover-exchange queues. It would be good to be able to define replicated and non-replicated queues and exchanges in these cases.

Scalability: The current cluster design only addresses reliability. Adding more brokers to a cluster will not increase the cluster's throughput since all brokers are doing all the work. A better approach would move move some of the work to be done only by the directly-connected broker, and to allow messages to "bypass" the cluster when both producer and consumer are connected to the same member.UUID algorithm?