Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

Prerequisite - Install Qpid Messaging

QMF uses Qpid AMQP Messaging (QPid) as its means of communication. To use QMF, Qpid messaging must be installed somewhere in the network. Qpid can be downloaded as source from Apache, is packaged with a number of Linux distributions, and can be purchased from commercial vendors that use Qpid. Please see Download for information as to where to get Qpid Messaging.

...

Method

Arguments

Invoked when...

brokerConnected

broker

a connection to a broker is established

brokerDisconnected

broker

a connection to a broker is lost

newPackage

name

a new package is seen on the QMF bus

newClass

kind, classKey

a new class (event or object) is seen on the QMF bus

newAgent

agent

a new agent appears on the QMF bus

delAgent

agent

an agent disconnects from the QMF bus

objectProps

broker, object

the properties of an object are published

objectStats

broker, object

the statistics of an object are published

event

broker, event

an event is published

heartbeat

agent, timestamp

a heartbeat is published by an agent

brokerInfo

broker

information about a connected broker is available to be queried

methodResponse

broker, seq, response

the result of an asynchronous method call is received

Supplied with the API is a class called DebugConsole. This is a test Console instance that overrides all of the methods such that arriving asynchronous data is printed to the screen. This can be used to see all of the arriving asynchronous data.

...

No Format
brokerConnected: Broker connected at: localhost:5672
event: Thu Jan 29 19:53:19 2009 INFO  org.apache.qpid.broker:bind broker=localhost:5672 ...

Receiving Objects

To illustrate asynchronous handling of objects, a small console program is supplied. The entire program is shown below for convenience. We will then go through it part-by-part to explain its design.

This console program receives object updates and displays a set of statistics as they change. It focuses on broker queue objects.

No Format

# Import needed classes
from qmf.console import Session, Console
from time        import sleep

# Declare a dictionary to map object-ids to queue names
queueMap = {}

# Customize the Console class to receive object updates.
class MyConsole(Console):

  # Handle property updates
  def objectProps(self, broker, record):

    # Verify that we have received a queue object.  Exit otherwise.
    classKey = record.getClassKey()
    if classKey.getClassName() != "queue":
      return

    # If this object has not been seen before, create a new mapping from objectID to name
    oid = record.getObjectId()
    if oid not in queueMap:
      queueMap[oid] = record.name

  # Handle statistic updates
  def objectStats(self, broker, record):
    
    # Ignore updates for objects that are not in the map
    oid = record.getObjectId()
    if oid not in queueMap:
      return

    # Print the queue name and some statistics
    print "%s: enqueues=%d dequeues=%d" % (queueMap[oid], record.msgTotalEnqueues, record.msgTotalDequeues)

    # if the delete-time is non-zero, this object has been deleted.  Remove it from the map.
    if record.getTimestamps()[2] > 0:
      queueMap.pop(oid)

# Create an instance of the QMF session manager.  Set userBindings to True to allow
# this program to choose which objects classes it is interested in.
sess = Session(MyConsole(), manageConnections=True, rcvEvents=False, userBindings=True)

# Register to receive updates for broker:queue objects.
sess.bindClass("org.apache.qpid.broker", "queue")
broker = sess.addBroker()

# Suspend processing while the asynchronous operations proceed.
try:
  while True:
    sleep(1)
except:
  pass

# Disconnect the broker before exiting.
sess.delBroker(broker)

Before going through the code in detail, it is important to understand the differences between synchronous object access and asynchronous object access. When objects are obtained synchronously (using the getObjects function), the resulting proxy contains all of the object's attributes, both properties and statistics. When object data is published asynchronously, the properties and statistics are sent separately and only when the session first connects or when the content changes.

The script wishes to print the queue name with the updated statistics, but the queue name is only present with the properties. For this reason, the program needs to keep some state to correlate property updates with their corresponding statistic updates. This can be done using the ObjectId that uniquely identifies the object.

No Format

    # If this object has not been seen before, create a new mapping from objectID to name
    oid = record.getObjectId()
    if oid not in queueMap:
      queueMap[oid] = record.name

The above code fragment gets the object ID from the proxy and checks to see if it is in the map (i.e. has been seen before). If it is not in the map, a new map entry is inserted mapping the object ID to the queue's name.

No Format

    # if the delete-time is non-zero, this object has been deleted.  Remove it from the map.
    if record.getTimestamps()[2] > 0:
      queueMap.pop(oid)

This code fragment detects the deletion of a managed object. After reporting the statistics, it checks the timestamps of the proxy. getTimestamps returns a list of timestamps in the order:

  • Current - The timestamp of the sending of this update.
  • Create - The time of the object's creation
  • Delete - The time of the object's deletion (or zero if not deleted)

This code structure is useful for getting information about very-short-lived objects. It is possible that an object will be created, used, and deleted within an update interval. In this case, the property update will arrive first, followed by the statistic update. Both will indicate that the object has been deleted but a full accounting of the object's existence and final state is reported.

No Format

# Create an instance of the QMF session manager.  Set userBindings to True to allow
# this program to choose which objects classes it is interested in.
sess = Session(MyConsole(), manageConnections=True, rcvEvents=False, userBindings=True)

# Register to receive updates for broker:queue objects.
sess.bindClass("org.apache.qpid.broker", "queue")

The above code is illustrative of the way a console application can tune its use of the QMF bus. Note that rcvEvents is set to False. This prevents the reception of events. Note also the use of userBindings=True and the call to sess.bindClass. If userBindings is set to False (its default), the session will receive object updates for all classes of object. In the case above, the application is only interested in broker:queue objects and reduces its bus bandwidth usage by requesting updates to only that class. bindClass may be called as many times as desired to add classes to the list of subscribed classes.

Asynchronous Method Calls and Method Timeouts

Method calls can also be invoked asynchronously. This is useful if a large number of calls needs to be made in a short time because the console application will not need to wait for the complete round-trip delay for each call.

Method calls are synchronous by default. They can be made asynchronous by adding the keyword-argument _async=True to the method call.

In a synchronous method call, the return value is the method result. When a method is called asynchronously, the return value is a sequence number that can be used to correlate the eventual result to the request. This sequence number is passed as an argument to the methodResponse function in the Console interface.

It is important to realize that the methodResponse function may be invoked before the asynchronous call returns. Make sure your code is written to handle this possibility.

Discovering what Kinds of Objects are Available