Versions:

Reliable Broker Cluster

This document is in two parts.

Concepts discusses outlines concepts and assumptions.

Implementation breaks down the implementation work and describes a “first make it work, then make it faster” approach to implementing a correct and scalable cluster.

Concepts

A Reliable Broker Cluster is a group of brokers collaborating to present the illusion of a single broker. All brokers maintain the full state of the virtual cluster. If any broker fails, clients can fail-over to any other broker to resume sessions.

In active-active mode, clients can connect to any broker. In the event of a failure, clients can fail-over to any other broker in the cluster.

In active-passive mode, there is exactly one active broker. All clients connect to the active broker, other brokers are passive back-ups. If the active broker fails, one of the passive brokers becomes active and all clients fail over to it. If a client tries to connect to a passive broker it is re-directed to the active broker.

It appears that the synchronization necessary for active-active and active-passive modes are so similar that it makes sense to implement active-passive as a special case of active-active rather than a separate mechanism. The brokers can run identical synchronization code in both cases.

Virtual Synchrony

The cluster will use the CPG (Closed Process Group) protocol, part of the Open AIS suite. CPG is a virtual synchrony protocol. CPG members multicast to the group. This document refers to CPG multicast messages as “events” to avoid confusion with AMQP messages.

CPG guarantees that each group member gets an identically ordered stream of events. To get consistent behavior each member acts on events in the order they are received from the cluster. Members send their own events to the cluster but do not act on them until they are self-delivered back from the cluster.

Broker State

Broker state can be divided into:

Consumer state lies between shared and conversational. On the one hand, consumers belong to sessions. On the other hand, if two sessions subscribe to the same queue, the existence of the other subscription affects the output to both clients. You could consider subscriptions to be “dequeue wiring”. Both choices seem to be possible, with different implications for the implementation.

Each broker maintains state for the entire “virtual broker” represented by the cluster. The broker directly connected to a client has “live” conversational state for that client. Other brokers have “shadow” state. Only the directly connected broker sends data to the client, all brokers update their shadow state so that they are ready to accept any failed-over session at all times.

Synchronizing the cluster

There are two aspects to synchronizing the cluster:

We will use CPG to get consistent ordering. All input events are multicast to the cluster, and processed only when received from the cluster - even on the node that initially multicast the event. That ensures all nodes process events in a consistent order.

Input includes:

Enqueues vs. Dequeues

Enqueues

Incoming transfers are are replicated as they are received from clients. They must be replicated before sending a completed control for the transfer.

Dequeue events

Currently the broker sometimes dequeues messages based on writability of client connections. This introduces a non-deterministic element in the cluster as other brokers can't know what the connection state is.

When a broker wants to send a message to a local client, it issues a “dequeue event” via CPG indicating the queue and consumer to receive the message. No message is actually dequeued until the dequeue event returns via CPG so it is ordered with respect to other changes to the queue. If the queue is not empty when the event returns, the broker can dequeue and send to the client, unless the queue is empty. In that case the broker may send another dequeue or do nothing depending on the state of consumers and queues.

Brokers receiving a foreign dequeue notice from CPG will dequeue the message (if the queue is not empty) and update the shadow session for the outgoing message.transfer. They do not actually send the message.

Implicit dequeues

Some dequeues are predictable based on client commands alone. For example if a messages is added to an empty queue with only one consumer, and that consumer has sufficient credit for the message then the dequeue is implied and can be done immediately by all brokers as part of processing the message.transfer.

Explicit dequeue events are required only when a broker makes a decision that is not determined entirely by cluster input events to date, for example because a client connection becomes writable.

Error Handling

An error is recoverable if an isolated broker could kill the problematic sessions or connections but continue to provide service. A core dump is an example of an unrecoverable error.

Handling unrecoverable errors is straightforward: the affected broker leaves the cluster – probably because its process was killed.

There are two types of recoverable error

Unpredictable errors must be handled in such a way that the cluster does not become inconsistent. In a situation where one broker experiences an unpredictable error and the others do not, we want the broker in error to shut down and leave the cluster so its clients can fail over to healthy brokers.

However some unpredictable errors may happen identically on all brokers. For example brokers with identical journal configuration may all run out of disk space at the same point. We don't want to kill the entire cluster in this case.

To deal with this case, a broker that experiences an unpredictable error updating shared state stalls. I.e.

A broker that receives a stall notice and successfully passes or has already passed the point of the stall sends a “shoot yourself” notice referencing the stall notice and continues processing.

The outcomes for stalled broker(s) are:

  1. Receive a “shoot yourself” - exit the cluster so clients can fail over to a healthy broker.

  2. Receive an identical stall notice from every cluster member - resume processing, first the stored events received while stalled then continue with new events.

  3. Receive non-identical stall notices from every cluster member – if the broker's stall notice “wins” then resume else leave the cluster.

In the event of multiple non-identical stalls around the same time, we need a deterministic way to pick the “winner”.

  1. Organize notices into groups of identical notices.

  2. If there is a single largest group, it wins.

  3. If there are multiple largest groups the group with the oldest (in cluster order) notice wins.

This ensures that a single recoverable failure, or even multiple failures will never bring down the entire cluster and we continue with the largest cluster possible without causing inconsistency.

Transactions

Transactions are conversational state, allowing a session to collect changes for the shared state and then apply them all at once or not at all.

For maximum safety and simplicity all cluster members participate in the transaction.

For TX transactions each broker creates an identical transaction, they all succeed or fail identically since they're all being fed identical input (see Error Handling above for what happens if a broker doesn't reach the same conclusion.)

For DTX transactions, the originating broker creates a new transaction or joins an existing one. All other brokers join the same transaction as the originating broker. (For the case of a new transaction either transaction ids must be predictable or an extra cluster event is needed to propagate the txid.) The DTX system ensures that all brokers succeed or fail together.

For TX transactions it is possible to fail-over mid transaction since all brokers have the same transaction state. For DTX transactions a broker failure will roll bach

An alternative approach previously discussed is that only the origin broker has a transaction, transactions.

Persistence and Asynchronous Journaling

There are several possible combinations of cluster & persistence:

In a cluster with mixed transient-persistent brokers, all the persistent brokers need to write their journals before we let the client see a completed message. So we need to replicate the incoming transfer to the cluster and then wait for both it and a “journaled” notice from all persistent members

TODO:

Joining a running cluster

If a member fails the cluster can continue, but to restore reliability we need to add a new member to the running cluster.

A new cluster member must:

TODO: issues to address here

Stalling the cluster

New members joining may need time to “catch up”. Ideally they can do this fast enough that the rest of the cluster does not have to be affected, however under heavy load they may need to request the cluster stall or slow down to catch up within a reasonable time-frame.

TODO:

Long stall till newcomer catches up or series of short stalls, may cause less disruption to clients.

Starting and Restarting cluster

All Transient

Cluster members can simply be started. The first broker establishes a cluster, the remaining brokers join it. An all-transient cluster retains no state if it is shut-down or killed.

All Persistent

In a persistent cluster that is shut-down or experiences total failure, there may be differences between the databases of the members. Therefore we need a “start up” phase where the members decide who has the most recent database and update all the databases to match.

Possible implementation:

  1. Start all the members in “persistent start-up mode”. Brokers do nothing at this point.

  2. When all are started, admin gives a “persistent update” command. The member with the most up-to-date database multicasts a brain dump to the group, all members get up to date.

  3. Once complete, the cluster switches to active mode.

Mixed transient-persistent

Start persistent members in “persistent start-up mode” as above, transient members in “transient start-up mode”. All members update as above.

Implementation notes

Initial prototype

Very little disturbance to existing code. Correct because everything is serialized thru CPG.

Not scalable: processing everything in the CPG dispatch thread will not scale on multiple CPUs.

Concurrency

The current stand-alone broker has two roles for threads:

Only one thread (read or write) is allowed to work on behalf of a connection at a time so conversational state is not locked. Shared state is locked for thread safety on a per-queue and per-exchange basis.

The prototype adds a third thread role – cluster – and modify the work done in read and write threads as follows:

The problem is this serializes most broker activity and will not scale on multiple CPUs.

To restore concurrency similar to the current model we could re-dispatch cluster events to threads on a per-(shadow-)connection or per-session basis.

This creates non-determinism, since threads in different brokers may complete at different rates. That means we can't actually modify the shared model or issue completions directly in the worker threads. The changes must be serialized so that they are executed only when the corresponding cluster event is received and in the same order.

Designated queue owners.

If queues are assigned “owners” then the owner may be able to delay replication of dequeue. Instead of replicating the transfer, we only need to replicate before completing the accept.

Note: for failover we must also replicate before issuing a known-complete for sent transfers.

TODO: active-passive only or does this work for active active. E.g. pass an “ownership token” if multiple brokers want to dequeue from the queue. Is passing such a token any better than full active-active replication of dequeues?

Global indexes.

Cluster events will need to refer to various entites: connections, sessions, consumers, input events etc. All are uniquely identifiable but some have awkward, variable-length identifires. E.g. a session is uniquely identified by session-name+authentication-principal.

The cluster will assign a globally unique 64-bit integer index to all such entities with a single monotonically increasing wrap-around counter. Every broker maintains its own counter, indexes are associated with entities in the order they are first mentioned in the CPG event stream. This order is the same for all cluster members.

This has the slightly unusual property that you don't know an object's cluster index until after you re-receive the input event where it is introduced. However since these Ids are for use in cluster events, they can only ever refer to objects introduced in an earlier event, so there's no need to know an index until after the event that introduces it is delivered.

Cluster events themselves are identified by a separate monotonically increasing 64 bit wrap-around counter.

Reducing cluster traffic.

Not all frames affect shared state, e.g. session controls. Some of these can be omitted.

TODO: Evaluate percentage of traffic that could be omitted, is it significant?

Choosing input events

Prototype replicates each frame.

Alternative: replicate raw connection read buffers. Read buffers typically contain multiple frames so this may give better throughput in the cluster.

Disadvantage: this does not allow selective replication of frames.