Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
$ cd <home>/qpid/examples/examplefolder
$ example-...-.exe [hostname] [portnumber]

where hostname is the qpid broker host name (default is localhost) and portnumber is the port number on which the qpid broker is accepting connection (default is 5672).

Creating and Closing Sessions

...

If a broker is running, you should see the qpidd process in the output of the above
command.

3) Publish a series of messages to the amq.direct exchange by running direct producerRead the messages from the message queue using direct listener, as follows:

Code Block
$ cd <home>/qpid/examples/direct

...

Code Block
$ ./example-direct-ProducerListener.exe [hostname] [portnumber]

or with mono:

Code Block
$ mono ./example-direct-ProducerListener.exe [hostname] [portnumber]

This program has no output; the messages are routed to the message queue, as instructed by the binding.is waiting for messages to be published, sex next step:

4) Publish a series of messages to the amq.direct exchange by running direct producer4) Read the messages from the message queue using direct consumer or listener, as follows:

Code Block
$ cd <home>/qpid/examples/direct

...

Code Block
$ ./example-direct-ListenerProducer.exe  [hostname] [portnumber]

or with mono:

Code Block
$ mono ./example-direct-ListenerProducer.exe [hostname] [portnumber]

This program has no output; the messages are routed to the message queue, as instructed by the binding.

On the direct listener console, you You should see the following output:

Code Block
Message: Message 0
Message: Message 1
Message: Message 2
Message: Message 3
Message: Message 4
Message: Message 5
Message: Message 6
Message: Message 7
Message: Message 8
Message: Message 9
Message: That's all, folks!
Shutting down listener for message_queue

Now we will examine the code for each of these programs. In each section, we will discuss only
the code that must be added to the skeleton shown in Section "Creating and Closing Sessions".

Reading Messages from the Queue

The program , listener.cs, is a message listener that receives messages from a queue.

To create a message listener, create a class derived from IMessageListener, and override the messageTransfer method, providing the code that should be executed when a message is received. Note that the provided listener implementation acknowledges all the messages once they are received. This could have been done on a per message basis.

Code Block

public class MessageListener : IMessageListener
{
        private readonly ClientSession _session;
        private readonly RangeSet _range = new RangeSet();
        public MessageListener(ClientSession session)
        {            
            _session = session;
        }

        public void messageTransfer(IMessage m)
        {
            BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
            byte[] body = new byte[m.Body.Length - m.Body.Position];
            reader.Read(body, 0, body.Length);
            ASCIIEncoding enc = new ASCIIEncoding();
            string message = enc.GetString(body);
            Console.WriteLine("Message: " + message);
            // Add this message to the list of message to be acknowledged 
            _range.add(m.Id);       
            if( message.Equals("That's all, folks!") )
            {
                // Acknowledge all the received messages 
                _session.messageAccept(_range);     
                lock(_session)
                {
                    Monitor.Pulse(_session);
                }
            }
        }
    }
}

The main body of the program creates a listener for the subscription; attaches the listener to a message queue; and
subscribe to the queue to receive messages from the queue.

Code Block

lock (session)
{
  // Create a listener and subscribe it to the queue named "message_queue"
  IMessageListener listener = new MessageListener(session);
  session.attachMessageListener(listener, "message_queue");                              
  session.messageSubscribe("message_queue");
  // Receive messages until all messages are received
  Monitor.Wait(session);
}