You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

This page provides a high level description of the architecture of Broker-J.

Broker-J is messaging broker that implements the AMQP protocols (version 0-8, 0-9, 0-91, 0-10 and 1.0).  Any AMQP compliant messaging library can be used with the Broker.  The Broker supports on the fly message translation from one AMQP protocol to another, meaning it is possible to use the Broker to allow clients that use different AMQP protocol version to exchange messages.

The Broker has a highly pluggable architecture that allows alternative implementations to be substituted for any concern.  For instance, you can simply build a module delegating to your own storage or own authentication provider linking to your enterprise authentication backend.

Broker-J is 100% pure Java.  It can be run standalone or embedded within another Java applications.

Model

A tree of manageable categories, that are extend of the interface ConfiguredObject, underpins the Broker.   A ConfiguredObject has zero or more attributes, zero or more children and zero or more context variable name/value pairs.  A ConfiguredObject may be persisted to a configuration store so its state can be restored when the Broker is restarted.

The manageable categories are arranged into a tree structure.  SystemConfig is at the root and has a single descendent Broker.  The Broker itself has children: Port, AuthenticationProvider, VirtualHostNode amongst others.   VirtualHostNode has a child VirtualHost.  It is the VirtualHost that has categories directly involved in messaging such as Queue.  The diagram below illustrates the category hierarchy but many categories are elided for brevity.

Category Specializations

Some categories have specialisations.  An example is the category Queue.  It has specialisations corresponding to the queue types supported by the Broker e.g. StandardQueue, PrirorityQueue etc.

Attributes

Each ConfiguredObject instance has zero or more attributes.   Attributes have a name and a value which can be a Java primitive value or an instance of any class for which an AttributeValueConverter exist.  This mechanism allows attribute values to be Lists, Sets, Maps, or arbitrary structured types ManagedAttributeValues.

Attributes are marked up in the code with method annotations @ManagedAttribute which defines things whether the attribute is mandatory or mutable.  Attributes can also be marked a secure which indicates restrictions no how the attribute is used (used for attributes that that store passwords or private-keys).

Attributes can have default values.  The default value applies if the user omits to supply a value when the object is created.  Defaults themselves can be defined in terms of context variable references.

Context Variables

Each ConfiguredObject instance has zero or more context variable assignments.  These are simply name/value pairs where both name and value are strings.

When resolving an attribute's value, if the attribute's value (or attribute's default) contains a context variable reference (e.g.${foo}), the variable is first resolved using the ConfiguredObject's own context variables. If the ConfiguedObject has no definition for the context variable, the entity's parent is tried, then its grandparent and so forth, all the way until the SystemContext is reached.  If the SystemContext provides no value, the JVM's system properties are consulted.

A context variable's value can be defined in terms of other context variables. 

Context variables are useful for extracting environment specific information from configuration for instance path stems or port numbers.

Lifecycle

ConfiguredObjects have a lifecycle.

A ConfiguredObject is created exactly once by a call its parent's #createChild() method.  This brings the object into existence.  It goes through a number of phases during creation (ConfiguredObject#create)

  1. resolution (where the attribute values are resolved and assigned to the object)
  2. creation validation (ensuring business rules are adhered to)
  3. registration with parents
  4. implementation specific creation (#onCreate)
  5. implementation specific opening (#onOpen)

When the Broker is restarted objects that exist in the configuration store are said to be recovered.  During recovery, they follow the opening (ConfiguredObject#open)

  1. resolution (where the attribute values are resolved and assigned to the object)
  2. validation (ensuring business rules are adhered to)
  3. implementation specific opening (#onOpen)

Some ConfiguredObjects support starting (ConfiguredObject#start()) and stopping (ConfiguredObject#stop()) but this have not yet been extended to all objects.

ConfiguredObject#delete() caused the object to be deleted.

AbstractConfiguredObject

Most configured object implementations extent AbstractConfiguredObject (ACO).  ACO provides the mechanics behind the configured implementations: attributes, context variables, state and lifecycle, and a listener mechanism: ConfigurationChangeListener.

Threading

The threading model used by the model must be understood before changes can be made safely.

The Broker and VirtualHost ConfiguredObject instances have a task executor backed by single configuration thread. Whenever the a configuration object needs to be changed, that change MUST be made by the nearest ancestor's configuration thread.  This approach ensures avoids the need to employ locking.  Any thread is allowed to observe the state of a ConfiguredObject at any time.  For this reasons, changes must be published safely so they can be read consistently by the observing threads.

The implementations of the mutating methods (#setAttributes(), #start(), #stop() etc) within AbstractConfiguredObject are already implemented to adhere to these rules.  

Configuration Persistence

ConfiguredObject categories such as SystemConfig and VirtualhostNode take responsibility for managing the storage of their children.  This is marked up in the model with the @ManagedObject annotation (#managesChildren). These objects utilise a DurableConfigurationStore to persist their durable children to storage.  ConfigurationChangeListener are used to trigger the update of the storage each time a ConfiguredObject is changed.

AMQP Transport Layer

There are two AMQP Transport Layers in Broker-J.

  • Traditional TCP/IP connections
  • Websocket

We'll consider the two layers separately.

TCP/IP

This layer is implemented from first principles using Java NIO.

It is non-blocking in nature.

It uses a Selector to monitor all connected sockets (and the accepting socket) for work.  Once work is detected (i.e. the selector returns) the connection work is serviced by threads drawn from an IO thread pool.  An eat-what-you-kill pattern is used to reduce dispatch latency.  This works in the following way.   The worker thread that performed the select, after adding all the ready connections to the work queue, adds the selector task to the work queue and then starts to process the work queue itself (this is the eat what you kill bit).  This approach potentially avoids the dispatch latency between the thread that performed select and another thread from the IO thread pool.   The Selector is the responsibility of the SelectorThread class.

Connections to peers are represented by a NonBlockingConnection.  The SelectorThread causes the NonBlockingConnections that require IO work to be executed (NonBlockingConnection#doWork).  On each work cycle, the NonBlockingConnection first goes through a write phase where pending work is pulled from the protocol engine producing bytes for the wire in the process.  If all the pending work is sent completely (i.e. the network buffer is not exhausted), the next phase is a read phase, where the bytes are consumed from the channel and fed into the protocol engine.  Finally there is a further write phase to send any new bytes resulting from the input we have just read.   The write/read/write sequence is organised so in order that the Broker first evacuates as much state from memory as possible (thus freeing memory) before reading new bytes from the wire.

In addition to the NonBlockingConnection being scheduled when bytes arrive for it from the wire, the Broker may need to awaken them at other times.  For instance, if a message arrives on a queue that is suitable for a consumer, the NonBlockingConnection associated with that consumer must added on to the work queue.   The mechanism that does this is NetworkConnectionScheduler#schedule method.  This is wired to the protocol engine via a listener.

Threading

The only threads that execute NonBlockingConnnections are those of the NetworkConnectionScheduler.  Furthermore, it is imperative that no NonBlockingConnnection is executed by more than one thread at once.  It is the job of ConnectorProcessor to organise this exclusivity.   Updates made by NonBlockingConnnection must be published safely so they can be read consistently by the other threads in the pool.

There is a NetworkConnectionScheduler associated with each AMQP Port and each VirtiualHost.  When a connection is made to the Broker, the initial interactions between peer and broker (exchange of protocol headers, authentication) take place on the thread pool of the NetworkConnectionScheduler of the Port.  Once the connection has indicate which VirtualHost it wishes to connect to, responsibility for the NonBlockingConnection shifts to the NetworkConnectionScheduler of the VirtualHost.

AMQP Protocol Engines

Queues

HTTP, REST and Web Management

AMQP Management

Terms

 

 

 

 

  • No labels