Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Broker-J is 100% pure Java.  It can be run standalone or embedded within another Java applications.

Model

A tree of manageable categories, all of which extend of the interface ConfiguredObject, underpin the Broker.   A ConfiguredObject has zero or more attributes, zero or more children and zero or more context variable name/value pairs.  A ConfiguredObject may be persisted to a configuration store so its state can be restored when the Broker is restarted.

...

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameUntitled Diagram
simpleViewerfalse
width
diagramWidth511
revision2

Category Specializations

Some categories have specialisations.  An example is the category Queue.  It has specialisations corresponding to the queue types supported by the Broker e.g. StandardQueuePrirorityQueue etc.

Attributes

Each ConfiguredObject instance has zero or more attributes.   Attributes have a name and a value which can be a Java primitive value or an instance of any class for which an AttributeValueConverter exist.  This mechanism allows attribute values to be Lists, Sets, Maps, or arbitrary structured types ManagedAttributeValues.

...

Attributes can have default values.  The default value applies if the user omits to supply a value when the object is created.  Defaults themselves can be defined in terms of context variable references.

Context Variables

Each ConfiguredObject instance has zero or more context variable assignments.  These are simply name/value pairs where both name and value are strings.

...

Context variables are useful for extracting environment specific information from configuration for instance path stems or port numbers.

Lifecycle

ConfiguredObjects have a lifecycle.

...

ConfiguredObject#delete() caused the object to be deleted.

AbstractConfiguredObject

Most configured object implementations extent AbstractConfiguredObject (ACO).  ACO provides the mechanics behind the configured implementations: attributes, context variables, state and lifecycle, and a listener mechanism: ConfigurationChangeListener.

Threading

The threading model used by the model must be understood before changes can be made safely.

...

The implementations of the mutating methods (#setAttributes(), #start(), #stop() etc) within AbstractConfiguredObject are already implemented to adhere to these rules.  

Configuration Persistence

ConfiguredObject categories such as SystemConfig and VirtualhostNode take responsibility for managing the storage of their children.  This is marked up in the model with the @ManagedObject annotation (#managesChildren). These objects utilise a DurableConfigurationStore to persist their durable children to storage.  ConfigurationChangeListener are used to trigger the update of the storage each time a ConfiguredObject is changed.

AMQP Transport Layer

At the high level, the transport layer

...

The transport is responsible for TLS.  The TLS configuration is owned from the PortKeystore and Truststore model objects.  If so configured, it is the transport's responsibility to managed the TLS connection.

TCP/IP

This layer is implemented from first principles using Java NIO.

...

In addition to the NonBlockingConnection being scheduled when singled by the Selector, the Broker may need to awaken them at other times.  For instance, if a message arrives on a queue that is suitable for a consumer, the NonBlockingConnection associated with that consumer must awoken.   The mechanism that does this is NetworkConnectionScheduler#schedule method which adds it to the work queue.  This is wired to the protocol engine via a listener.

Threading

The only threads that execute NonBlockingConnnections are those of the NetworkConnectionScheduler.  Furthermore, it is imperative that no NonBlockingConnnection is executed by more than one thread at once.  It is the job of ConnectorProcessor to organise this exclusivity.   Updates made by NonBlockingConnnection must be published safely so they can be read consistently by the other threads in the pool.

There is a NetworkConnectionScheduler associated with each AMQP Port and each VirtiualHost.  When a connection is made to the Broker, the initial exchanges between peer and broker (protocol headers, authentication etc) take place on the thread pool of the NetworkConnectionScheduler of the Port.  Once the connection has indicated which VirtualHost it wishes to connect to, responsibility for the NonBlockingConnection shifts to the NetworkConnectionScheduler of the VirtualHost.  

TLS

The TCP/IP transport layer responds to the TLS configuration provided by the PortKeystore and Truststore model objects.  It does this using the NonBlockingConnectionDelegates.

  • The NonBlockingConnectionUndecidedDelegate is used to allow Plain/TLS port unification feature (that is support for plain and TLS from the same port).  It sniffs the initial incoming bytes to determine if the peer is trying to negotiate a TLS connection or not.  Once the determination is made one of the following delegates is substituted in its place.
  • NonBlockingConnectionTLSDelegate is responsible for TLS connections.  It feeds the bytes through an SSLEngine.

  • NonBlockingConnectionPlainDelegate is used for non-TLS connections.

Idle timeout

All versions of the AMQP protocol support the idea of the peers regularly passing null data to keep a wire that would otherwise by silent (during quiet times) busy.   This is called idle timeout or heartbeating. It is configured during connection establishment.  If a peer detects that a other has stopped sending this data, it can infer that the network connection has failed or the peer has otherwise become inoperable and close the connection.  Sending of the null data is the responsibility of the ServerIdleWriteTimeoutTicker.  Responsibility of detecting the absence of data from the peer is ServerIdleReadTimeoutTicker.   When the Selector blocks awaiting activity the timeout is the minimum timeout value of all Tickers.

Websocket

AMQP 1.0 specification defines AMQP 1.0 over web sockets.  The earlier version of the protocols didn't do this but the implementation within the Broker actually any.

...

The PortKeystore and Truststore model objects are used to configure the websocket connection according to the TLS requirements.

AMQP Protocol Engines

The ProtocolEngine:

  • accepts bytes from the transport (ProtocolEngine#received).  
  • exposes a public method (ProtocolEngine#processPendingIterator) which is used by the transport layer to pull pending tasks that produce bytes for the wire from the engine.

The engine never pushes bytes onto the transport.

Accepting bytes

The transport references an instance of the MultiVersionProtocolEngine.  Internally the MultiVersionProtocolEngine delegates to other ProtocolEngine implementations.  It switches from one implementation to another during this connection's life.

...

  • AMQPConnection_0_8Impl#received ultimately delegates to methods such as AMQPConnection_0_8Impl#receiveConnectionStartOk

  • AMQPConnection_0_10Impl#received ultimately delegates to delegate ServerConnectionDelegate#connectionStartOk
  • AMQPConnection_1_0Impl#received ultimately delegates to AMQPConnection_1_0Impl#receiveOpen

Producing bytes

 

As already said, the transport pulls tasks from the protocol engine.  These tasks produce bytes.  To do this, the transport calls the pending iterator which provides a stream of tasks that generate bytes for the wire. The transport keeps pulling until the output exceeds the buffer.  It then tries to write the buffered bytes to the wire.  If it writes more than half to the wire it continues to pull more tasks from the engine.  The cycle continues until the transport cannot take more bytes (back pressure at the TCP/IP layer, or the pending iterator yields no more tasks.   This arrangement always means that the transport retains control of backlog of bytes to be written to the wire.

The protocol engines' pending iterator are responsible for maintaining fairness within the connection.  They do this by maintaining state between invocations.  For instance if a connection had sessions A, B, C, all with tasks to producer and on this output cycle, the network stopped accepting bytes after A's tasks, on the next output cycle. B would be considered first, even if A had subsequently had more work. This fairness patten is repeated through each layer of the protocol.

Queues

<todo>

Management

The Broker exposes two management layers:

  • AMQP management
  • HTTP management

AMQP management

AMQP management is defined by the AMQP Management specification which is currently a Working Draft at Oasis.

HTTP management

 

HTTP, REST and Web Management

AMQP Management

Terms