Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

NB:

...

this

...

is

...

similar

...

to

...

Clustering

...

Design

...

Notes

...

in

...

many

...

ways.

...

The

...

principal

...

differences

...

are:

...

  • Active

...

  • state

...

  • propagation

...

  • between

...

  • primary/backup

...

  • only.

...

  • Propagation

...

  • to

...

  • other

...

  • proxies

...

  • is

...

  • pulled

...

  • on

...

  • demand.

...

  • Use Open

...

  • AIS

...

  • protocol for

...

  • more

...

  • efficient

...

  • cluster

...

  • state

...

  • management.

...

  • Transaction

...

  • logging

...

  • for

...

  • fast

...

  • persistent

...

  • backup

...

  • in

...

  • case

...

  • of

...

  • more

...

  • widespread

...

  • failure.

...

The

...

two

...

designs

...

should

...

be

...

reviewed

...

&

...

merged.

...

Clustering

...

design

...

Summary

Note:

...

in

...

this

...

document

...

the

...

term

...

"resource"

...

means

...

"Queue

...

or

...

Exchange".

...

A

...

broker

...

cluster

...

provides

...

the

...

illusion

...

of

...

a

...

single

...

broker:

...

any

...

cluster

...

member

...

can

...

serve

...

clients

...

for

...

any

...

clustered

...

resource.

...

For

...

each

...

clustered

...

resource

...

a

...

cluster

...

broker

...

has

...

exactly

...

one

...

of

...

these

...

roles:

...

  • Primary:

...

  • primary

...

  • owner

...

  • of

...

  • the

...

  • Queue/Exchange.

...

  • Backup:

...

  • becomes

...

  • primary

...

  • if

...

  • primary

...

  • fails.

...

  • Proxy:

...

  • forwards

...

  • to

...

  • primary.

...

This

...

application

...

is

...

partitioned

...

by

...

assigning

...

groups

...

of

...

queues/exchanges

...

to

...

primary

...

brokers.

...

The

...

same

...

broker

...

can

...

be

...

primary

...

for

...

one

...

exchange,

...

backup

...

for

...

another,

...

proxy

...

for

...

another

...

etc.

...

Primary/backup

...

failover

...

provides

...

software/hardware

...

fault

...

tolerance.

...

Proxies

...

provide

...

load

...

distribution

...

and

...

act

...

as

...

connection

...

concentrators.

...

TODO:

...

There

...

are

...

many

...

possible

...

configurations

...

for

...

these

...

roles,

...

need

...

to

...

identify

...

some

...

basic

...

use

...

cases

...

and

...

show

...

corresponding

...

configuration.

...

Wherever

...

it

...

makes

...

sense

...

we

...

will

...

use

...

AMQP

...

itself

...

to

...

communicate

...

within

...

the

...

cluster,

...

using

...

the

...

following

...

extension

...

points

...

to

...

carry

...

cluster

...

data

...

over

...

standard

...

AMQP:

...

  • Field table parameters to various AMQP methods (declare()

...

  • arguments

...

  • etc.)

...

  • Field

...

  • table

...

  • in

...

  • message

...

  • headers.

...

  • System

...

  • exchanges

...

  • and

...

  • queues.

...

  • Alternate

...

  • implementations

...

  • of

...

  • Queues,

...

  • Exchanges,

...

  • Bindings.

...

Sessions

With failover a single client-broker

...

"session"

...

may

...

involve

...

multiple

...

connects/disconnects

...

and

...

multiple

...

broker

...

and/or

...

client

...

processes.

The client generates a UUID to identify each new session.

Extension points in the existing protocol are used to create a session:

Code Block
 

The client generates a UUID to identify each new session.

Extension points in the existing protocol are used to create a session:

<example>
# AMQP methods to start a session.

# Client requests a timeout
client.start(server-properties={
  qpid.session.start:<uuid>
  qpid.session.timeout:<in seconds> })

# Broker may impose a lower the timeout.
server.start-ok(client-properties={qpid.session.timeout:<seconds>]})
</example>

The

...

broker

...

keeps

...

the

...

session

...

for

...

the

...

timeout

...

period

...

if

...

the

...

client

...

connection

...

is

...

unexpectedly

...

closed,

...

or

...

if

...

the

...

client

...

calls

...

Connection.close()

...

with

...

reply-code=KEEP_SESSION

...

(new

...

code).

...

Any

...

other

...

type

...

of

...

connection

...

closure

...

terminates

...

the

...

session.

...

If

...

the

...

broker

...

is

...

configured

...

for

...

HA

...

then

...

sessions

...

will

...

survive

...

failover.

...

If

...

not,

...

sessions

...

will

...

survive

...

temporary

...

client

...

disconnect.

Code Block


<example>
# Client resuming a disconnected session:
client.start(server-properties={qpid.session.resume:<uuid>})

# Broker returns the status of the session.
server.start-ok(client-properties={qpid.session.resume:<session status>})
</example>

TODO

...

define

...

<session

...

status>.

...

Gives

...

the

...

status

...

of

...

the

...

session

...

as

...

known

...

to

...

the

...

broker

...

-

...

things

...

like

...

resumed

...

channel

...

ids,

...

outstanding/unacked

...

requests

...

etc.

...

If

...

the

...

session

...

has

...

timed

...

out

...

or

...

does

...

not

...

exist

...

broker

...

closes

...

the

...

connection

...

with

...

an

...

exception.

...

Client Failover

Finding Alternate Brokers

The Connection.open-ok

...

known-hosts

...

parameter

...

provides

...

the

...

initial

...

list

...

of

...

alternate

...

brokers.

...

Note

...

each

...

"host"

...

can

...

be

...

"host:port".

...

Client

...

can

...

bind

...

a

...

temporary

...

queue

...

to

...

predeclared

...

exchange

...

"amq.qpid"

...

with

...

routing-key="cluster".

...

The

...

broker

...

will

...

publish

...

a

...

message

...

whenever

...

the

...

alternate

...

broker

...

list

...

changes.

...

The

...

message

...

has

...

empty

...

body

...

and

...

header:

Code Block


<example>
  qpid.known.hosts = longstr{ shortstr host, shortstr host ... };
</example>

*** Choosing a Broker

Client chooses randomly among alternate brokers.

Sophisticated strategies for choosing brokers 

Choosing a Broker

Client chooses randomly among alternate brokers.

Sophisticated strategies for choosing brokers e.g.

...

nearest-in-network,

...

least-loaded

...

etc

...

may

...

be

...

considered

...

in

...

later

...

versions.

...

Client

...

resumes

...

the

...

session

...

as

...

described

...

under

...

"Sessions"

...

Clustered

...

Resources

...

Each

...

resource

...

on

...

a

...

clustered

...

broker

...

falls

...

into

...

one

...

of

...

these

...

categories:

...

  • Local:

...

  • not

...

  • visible

...

  • to

...

  • other

...

  • cluster

...

  • members.

...

  • Proxied:

...

  • visible

...

  • via

...

  • all

...

  • cluster

...

  • members

...

  • but

...

  • not

...

  • fault-tolerant.

...

  • Fault

...

  • tolerant,

...

  • transient:

...

  • Mirrored

...

  • to

...

  • backup,

...

  • not

...

  • logged

...

  • to

...

  • disk.

...

  • Fault

...

  • tolerant,

...

  • durable:

...

  • Mirrored

...

  • and

...

  • logged

...

  • to

...

  • disk.

...

Transient

...

resources

...

and

...

transient

...

messages

...

survive

...

failover

...

to

...

an

...

active

...

backup,

...

but

...

not

...

failure

...

of

...

both

...

primary

...

and

...

backup.

...

Durable

...

fault-tolerant

...

resources

...

and

...

persistent

...

messages

...

on

...

durable,

...

fault

...

tolerant

...

queues

...

survive

...

any

...

failover

...

including

...

failure

...

and

...

restart

...

of

...

the

...

entire

...

cluster.

...

Local

...

resources

...

do

...

not

...

survive

...

failure

...

of

...

their

...

broker.

...

Proxied

...

resources:

...

when

...

a

...

broker

...

joins

...

a

...

cluster

...

it

...

will

...

create

...

proxies

...

for

...

all

...

clustered

...

resources.

...

Cluster

...

brolkers

...

automatically

...

keep

...

their

...

proxy

...

set

...

up

...

to

...

date

...

as

...

brokers

...

join

...

&

...

leave

...

the

...

cluster

...

and

...

as

...

resource

...

are

...

created/destroyed

...

on

...

their

...

primary

...

broker.

...

Creating Clustered/Proxied

...

Resources

...

A

...

qpid

...

client

...

can

...

explicitly

...

request

...

creation

...

of

...

cluster

...

or

...

proxy

...


resource

...

by

...

adding

...

to

...

the

...

"arguments"

...

parameter

...

of

...

declare

...

methods.

Code Block

<example>
# Create a local proxy to a resource on <host>.
# Fail if the resource does not exist at host or host is invalid.
declare(arguments={qpid.cluster.proxy:<host>})

# Create a clustered resource on remote primary <host> and a local proxy.
# If host="" or the current broker then create a clustered resource
# with the local broker as primary.
declare(arguments={qpid.cluster.primary = <host>
</example>

Clustered

...

resources

...

can

...

be

...

declared

...

on

...

any

...

clustered

...

broker

...

from

...

any

...

other

...

clustered

...

broker.

...

Declarations

...

are

...

forwarded

...

to

...

the

...

primary.

...

For

...

compatiblility

...

with

...

non-Qpid

...

clients

...

a

...

broker

...

can

...

be

...

configured

...

(via

...

AMQP?)

...

with

...

a

...

list

...

of

...

rules

...

mapping

...

queue/exchange

...

names

...

to

...

names

...

to

...

primary

...

hosts.

...

If

...

a

...

normal

...

declare()

...

name

...

matches

...

such

...

a

...

rule

...

the

...

broker

...

will

...

either

...

create

...

a

...

proxy

...

to

...

the

...

resource

...

or

...

attempt

...

to

...

create

...

the

...

remote

...

cluster

...

resource

...

based

...

on

...

rule

...

configuration.

...

TODO

...

details.

...

Proxies

Proxies use only AMQP to communicate. They can be used indepedently of clustering as federated connection concentrators. They use standard AMQP on both sides so they can interoperate with non-Qpid clients and brokers.

Pure-proxy brokers can be used as connection concentrators and to cross network boundaries - e.g. TCP to IGMP forwarding, running in DMZ etc.

The basic idea is to create proxy exchange/queue instances with special proxy implementations to give the illusion of a common queue/exchange setup on each broker. The behavior of these proxy objects is as follows:

TODO: namespace issue, see Questions section below.

Publish to proxy exchange

Local proxy exchange acts as client to remote exchange. Any message published to proxy exchange is forwarded to remote exchange.

Proxy exchanges have the same name as their remote counterpart so they they can be used as if they were the remote counterpart.

Consume from proxy queue

Proxy queue consumes, gets and acknowledges messages from remote queue on behalf of its clients. It must not acknowledge any message until it has received acknolwedgement from its client. It should use flow control to avoid consuming messages unless it has a client ready to consume them.

TODO: draw out the scenarios for avoiding "stuck message syndrome" where a proxy queue consumes a message but then has nobody to pass it on to.

Bind local queue to proxy exchange

Proxy exchange creates private queue on remote broker and binds to remote exchange with identical binding arguments. Proxy exchange consumes from private remote queue and enqueues messages on the local queue.

Bind proxy queue to local exchange

The proxy queue acts as client to the remote broker. All messages delivered to the proxy queue are sent to the remote queue via the remote default exchange and default (queue name) binding.

Bind proxy queue to proxy exchange

Given: remote Queue Q on remote broker Bq, remote Exchange X on remote broker Bx. Local broker is B.

If Bq=Bx then simply bind on the remote broker.

If Bq or Bx is a Qpid broker then create a proxy on the Qpid broker for the resource on the non-qpid broker and bind it to the resource on the Qpid broker.

If neither Bq nor Bx is a qpid broker: bind the proxy queue to the proxy exchange (combine rules of two preceeding use cases.) This means traffic from Bx will be routed thru B and forwared to Bq. May or may not be a useful thing to do.

Resource lifecycle

Explicit resource delete commands are forwarded to the primary broker.

In a Qpid cluster, the cluster is notified via the cluster map (see below) so resources can be reclaimed on all proxy brokers.

If Qpid brokers act as proxies for non-Qpid brokers outside a cluster it may not be possible to immediately notify all the proxies. Therefore brokers must also clean up proxy resources if a NOT_FOUND exception indicates that a resource no longer exists, and should propagate the exception back to the client.

This could still leave dead proxies in idle brokers. Proxy brokers may periodically check the existence of resources they proxy for (using declare) and clean up if the proxied resource no longer exists.

Auto-delete queues: proxies for auto-delete queues should mimic their clients, in particular they must not keep the queue open for consume if they have no clients consuming. On the other hand they must not allow the queue to be deleted if they still have consuming clients.

Connection management

TODO: details:

  • Clients must be unaware of disconnect/reconnect by proxies.
  • Proxies shouldn't hold unused connections open for no reason.
  • Proxies shouldn't create/teardown connections too frequently.
  • Proxies should share connections where possible.

Cluster Communication

Broker sessions.

Sessions are considered resources by the brokers - each session has primary and backup brokers . The primary for a session is the broker where the session is created.

Unlike queues and exchanges, sessions cannot be proxied. In the event of failure sessions are tranferred to another broker. After a failure a client can re-establish the session on any cluster broker.

A session is active from the time a client connects up to the failure of the connection, the broker or the client. If the broker sees a connection failure, it marks the session disconnnected.

If a client attempts to resume the session somewhere in the cluster, the session is transferred to that broker and become active again. That broker becomes the primary broker for the session.

If a broker fails, all its sessions are tranferred to the backup and marked disconnected. As above a client can resume a session at any broker, the session will be transferred and activated.

Active sessions cannot be transferred - a client can only be connected to a session via one broker at a time.

Resource sets

A cluster broker can take different roles with respect to different resources (queues, exchanges, sessions). A "resource set" is a set of resources owned by the same broker. The resource set is the unit of transfer, for example if a primary broker fails its entire resource set is taken over by the backup.

TODO: Failover merges the primary's resource set with the backup. Do we also need to allow resource sets to be split so a primary broker can offload part of its resource set on another broker?

The cluster map.

The cluster map lists

  • all brokers that are members of the cluster.
  • all cluster resources with the primary and backup broker for each.

OpenAIS will be used to give all brokers a consistent view of the cluster map:

  • membership events (join, leave, failed.)
  • cluster map updates
  • detecting failed brokers.
  • resource changes (create, delete, transfer)

All other traffic between brokers will use AMQP. The majority of traffic can be done using the proxy techniques described above. Other communication is via system exchange and queues.

TODO: is there a need for any other protocols?

Choosing primaries and backups

Options:

  • pre-configured static resources with fixed primary/backup.
  • client/admin specifies primary (and backup?) on resource creation.
  • dynamic: cluster automatically (re)assigns roles as brokers join.

Probably need a combination: a way to set up an initial static assignment, a way for admins to intervene and adjust assignments and a way for the cluster to respond dynamically to changes. At a minimum it must be able to respond to a node failure.

For example: brokers could be arranged in a ring with each broker being the backup fro the preceeding one. To add a new broker you choose a point in the ring to insert it and the adjacent brokers adjust their responsibilities accordingly. Implies we can transfer "backupship" as well as ownership.

Broker Failover

Transient resources and transient messages survive failover to an active backup, but not failure of both primary and backup.

Durable fault-tolerant resources and persistent messages on durable, fault tolerant queues survive any failover including failure and restart of the entire cluster.

In active/active clustering the primary broker forwards all commands and messages (persistent and transient) to the backup. The backup keeps an in-memory representation of the state of the primary. It still acts as a proxy for resourced owned by the primary, it does not act on the primaries behalf while it is alive.

Supposed A is backed up by B, B is backed up by C. If broker A fails its primary and backup resource sets are transferred to broker B. B becomes primary for those resources formerly owned by A and becomes backup for those resources formerly backed up by A. C becomes backup for those resources formerly owned by A.

A clustered broker also writes a transaction log of commands related to durable resources and persistent messages. Thus if both primary and backup fail, or if the primary fails without any backup, another broker can be elected to take over by recovering the state of the resources from the log files.

TODO: possible optimization - forward commands without messages to backup store messages persistently. Backup keeps model of state but only loads messages if actually required by failover. Tradeoff normal-running performance/network load with recovery time.

TODO: Elections - detail scenarios and strategy, research what openAIS has to offer.

TODO: Transfer of state to the third broker C in the failover scenario. I.e. how does C become the backup for resources transferred from A to B? How does it catch up with the state of those resources?

Logs and persistence.

Use GFS to share storage between brokers. Two types of storage to consider:

Transaction logs: Optimized for writing. A flat file with new entries always appended at the end. A single broker writes to each log file (minimise lock contention.) Not optimal for reading - need to read the entire sequence of log events to reconstruct the state of a broker. E.g. may include reading commands to create resources that subsequently get deleted. When the log reaches a certain size, start a new log file.

State snapshot: Image of the state of a set of resources at some point in time. More efficient for reading, more compact but slower to write because data structures must be reorganized.

A log consolidator reads from transaction logs and constructs a state snashot, deleting transaction logs as they are incorporated into the snapshot. So to do a full recovery you first load the snapshot, then apply the logs.

Questions

Lots more detail needed in failover. There are many ways you could configure these components, need to identify what makes sense, how much flexibility to offer and what simplifying assumptions we can make.

Need some profiling to see how various options perform in real life.

Split brain and quorums: How to achieve agreement that a broker is failed, e.g. using quorum device. Does open AIS solve this for us or is there more to it?

Elections & state tranfer: much more detail needed.

Naming problem for proxying and clustering. - Which names are local, which must be consistent across the cluster? - How/when to deal with clashes of local & remote names? - Use explicitly separate namespaces for proxy/cluster resources? - Use virtual hosts? All cluster/proxy resources live in a special virtual host? Relationship between FT storage and durable storage?

Proxies forward requests to primary only or also to hot backup?

UUID algorithm?