Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Wiki Markup
h1. Tutorial
This tutorial consists of a series of examples using the three most commonly used exchange types - Direct, Fanout and Topic
exchanges. These examples show how to write applications that use the most common messaging paradigms. 

* direct
In the direct examples, a message producer writes to the direct exchange, specifying a routing key. A message consumer reads messages from a named queue. This illustrates clean separation of concerns - message producers need to know only the exchange and the routing key, message consumers need to know only which queue to use on the broker. 
* fanout
The fanout examples use a fanout exchange and do not use routing keys. Each binding specifies that all messages for a given exchange should be delivered to a given queue.
* pub-sub
In the publish/subscribe examples, a publisher application writes messages to an exchange, specifying a multi-part key. A subscriber application subscribes to messages that match the relevant parts of these keys, using a private queue for each subscription.
* request-response
In the request/response examples, a simple service accepts requests from clients and sends responses back to them. Clients create their own private queues and corresponding routing keys. When a client sends a request to the server, it specifies its own routing key in the reply-to field of the request. The server uses the client's reply-to field as the routing key for the response.

h2. Running the Examples
Before running the examples, you need to unzip the file 

Tutorial

This tutorial consists of a series of examples using the three most commonly used exchange types - Direct, Fanout and Topic
exchanges. These examples show how to write applications that use the most common messaging paradigms.

  • direct
    In the direct examples, a message producer writes to the direct exchange, specifying a routing key. A message consumer reads messages from a named queue. This illustrates clean separation of concerns - message producers need to know only the exchange and the routing key, message consumers need to know only which queue to use on the broker.
  • fanout
    The fanout examples use a fanout exchange and do not use routing keys. Each binding specifies that all messages for a given exchange should be delivered to a given queue.
  • pub-sub
    In the publish/subscribe examples, a publisher application writes messages to an exchange, specifying a multi-part key. A subscriber application subscribes to messages that match the relevant parts of these keys, using a private queue for each subscription.
  • request-response
    In the request/response examples, a simple service accepts requests from clients and sends responses back to them. Clients create their own private queues and corresponding routing keys. When a client sends a request to the server, it specifies its own routing key in the reply-to field of the request. The server uses the client's reply-to field as the routing key for the response.

Running the Examples

...

Qpid.NET-net-2.0-M4.zip, the following tree is created:

...


{code
}
<home>
  |-qpid
     |-lib (contains the required dlls)
     |-examples
          |- direct
          |    |-example-direct-Listener.exe
          |    |-example-direct-Producer.exe
          |- fanout
          |    |-example-fanout-Listener.exe
          |    |-example-fanout-Producer.exe
          |- pub-sub
          |    |-example-pub-sub-Listener.exe
          |    |-example-pub-sub-Publisher.exe
          |- request-response
               |-example-request-response-Client.exe
               |-example-request-response-Server.exe
{code}

Make sure your PATH contains the directory <home>/qpid/lib

...

 
The examples can be run by executing the provided exe files:

...


{code
}
$ cd <home>/qpid/examples/examplefolder
$ example-...-.exe [hostname] [portnumber]
{code}
where [hostname] is the qpid broker host name (default is localhost) and [portnumber] is the port number on which the qpid broker is accepting connection (default is 5672).

...

Creating and Closing Sessions

All of the examples have been written using the Apache Qpid .NEt 0.10 API. The examples use the same skeleton code to initialize the program, create a session, and clean up before exiting:

Code Block
 

h2. Creating and Closing Sessions

All of the examples have been written using the Apache Qpid .NEt 0.10 API. The examples use the same skeleton code to initialize the program, create a session, and clean up before exiting:

{code}
using System;
using System.IO;
using System.Text;
using System.Threading;
using org.apache.qpid.client;
using org.apache.qpid.transport;

...

        private static void Main(string[] args)
        {
            string host = args.Length > 0 ? args[0] : "localhost";
            int port = args.Length > 1 ? Convert.ToInt32(args[1]) : 5672;
            Client connection = new Client();
            try
            {
                connection.connect(host, port, "test", "guest", "guest");
                ClientSession session = connection.createSession(50000);

                //--------- Main body of program --------------------------------------------

                connection.close();
            }
            catch (Exception e)
            {
                Console.WriteLine("Error: \n" + e.StackTrace);
            }
        }
...

Writing Direct Applications

...

{code}

h2. Writing Direct Applications

This section describes two programs that implement direct messaging using a Direct exchange:
• org.apache.qpid.example.direct.Producer (from example-direct-producer) publishes messages to the amq.direct exchange, using the routing key routing_key.

...


•org.apache.qpid.example.direct.Listener (from example-direct-Listener) uses a message listener to receive messages from the queue named message_queue.

...



h3. Running the Direct Examples

...



1) Make sure your PATH contains the directory <home>/qpid/lib

...

 

2) Make sure that a qpid broker is running:

...


{code
}
$ ps -eaf | grep qpidd
{code}
If a broker is running, you should see the qpidd process in the output of the above

...


command.

...



3) Read the messages from the message queue using direct listener, as follows:

...


{code
}
$ cd <home>/qpid/examples/direct
{code}
With cygwin:

...

 
{code
}
$ ./example-direct-Listener.exe [hostname] [portnumber]
{code}
or with mono:

...


{code
}
$ mono ./example-direct-Listener.exe [hostname] [portnumber]
{code}

This program is waiting for messages to be published, sex next step:

...

 

4) Publish a series of messages to the amq.direct exchange by running direct producer, as follows:

...



{code
}
$ cd <home>/qpid/examples/direct
{code}
With cygwin:

...

 
{code
}
$ ./example-direct-Producer.exe  [hostname] [portnumber]
{code}
or with mono:

...


{code
}
$ mono ./example-direct-Producer.exe [hostname] [portnumber]

...

{code}

This program has no output; the messages are routed to the message queue, as instructed by the binding.

...



On the direct listener console, you should see the following output:

...


{code
}
Message: Message 0
Message: Message 1
Message: Message 2
Message: Message 3
Message: Message 4
Message: Message 5
Message: Message 6
Message: Message 7
Message: Message 8
Message: Message 9
Message: That's all, folks!
{code}

Now we will examine the code for each of these programs. In each section, we will discuss only

...


the code that must be added to the skeleton shown in Section "Creating and Closing Sessions". 

h3.

...

 Reading Messages from the Queue

...



The program , listener.cs, is a message listener that receives messages from a queue.

...

To create a message listener, create a class derived from IMessageListener, and override the messageTransfer method, providing the code that should be executed when a message is received. Note that the provided listener implementation acknowledges all the messages once they are received. This could have been done on a per message basis.

Code Block

public class MessageListener : IMessageListener
{
        private readonly ClientSession _session;
        private readonly RangeSet _range = new RangeSet();


First it creates a queue named message_queue, then binds it to the amq.direct exchange using the binding key routing_key.

{code}
//--------- Main body of program --------------------------------------------
// Create a queue named "message_queue", and route all messages whose
// routing key is "routing_key" to this newly created queue.
session.queueDeclare("message_queue");
session.exchangeBind("message_queue", "amq.direct", "routing_key");    
{code}

The queue created by this program continues to exist after the program exits, and any message whose routing key matches the key specified in the binding will be routed to the corresponding queue by the broker. Note that the queue could have been be deleted using the following code: 

{code}
session.queueDelete("message_queue");
{code}

To create a message listener, create a class derived from IMessageListener, and override the messageTransfer method, providing the code that should be executed when a message is received. 

public class MessageListener : IMessageListener
{
  ......
  public void messageTransfer(IMessage m)
  {
  .....
}
{code}

The main body of the program creates a listener for the subscription; attaches the listener to a message queue; and
subscribe to the queue to receive messages from the queue.

{code}
lock (session)
{
  // Create a listener and subscribe it to the queue named "message_queue"
  IMessageListener listener = new MessageListener(session);
  session.attachMessageListener(listener, "message_queue");        public MessageListener(ClientSession session)
        {            
  session.messageSubscribe("message_queue");
  // Receive messages until all messages are received
  _session = session;
        }

        public void messageTransfer(IMessage m)
        {
            Monitor.Wait(session);
}
{code}

The MessageListener's messageTransfer() function is called whenever a message is received. In this example the message is printed and tested to see if it is the final message. Once the final message is received, the messages are acknowledged. 

{code}
BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
            byte[] body = new byte[m.Body.Length - m.Body.Position];
            reader.Read(body, 0, body.Length);
             0, body.Length);
ASCIIEncoding enc = new ASCIIEncoding();
            string message = enc.GetString(body);
            Console.WriteLine("Message: " + message);
            // Add this message to the list of message to be acknowledged 
             be acknowledged 
_range.add(m.Id);       
            if( message.Equals("That's all, folks!") )
{
  // Acknowledge all the received messages 
  _session.messageAccept(_range);     
  lock(_session)
  {
      Monitor.Pulse(_session);
    {
                // Acknowledge all the received messages 
                _session.messageAccept(_range);     
                lock(_session)}
}
{code}

h3. Publishing Messages to a Direct Exchange

The second program in the direct example, Producer.cs, publishes messages to the amq.direct exchange using the routing key routing_key.

First, create a message and set a routing key. The same routing key will be used for each message we send, so you only need to set this property once.
           
{code}
IMessage message = new  {
                    Monitor.Pulse(_session);
                }
            }
        }
    }
}

The main body of the program creates a listener for the subscription; attaches the listener to a message queue; and
subscribe to the queue to receive messages from the queue.

Code Block

lock (session)
{
  // Create a listener and subscribe it to the queue named "message_queue"
  IMessageListener listener = new MessageListener(session);
  session.attachMessageListener(listener, "message_queue"Message();
// The routing key is a message property. We will use the same
// routing key for each message, so we'll set this property
// just once. (In most simple cases, there is no need to set
// other message properties.)
message.DeliveryProperties.setRoutingKey("routing_key"); 
{code}

Now send some messages:

{code}
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
for (int i = 0; i < 10; i++)
{
  message.clearData();
  message.appendData(Encoding.UTF8.GetBytes("Message " + i));                  
  session.messageTransfer("amq.direct", message);                    
}
{code}

Send a final synchronous message to indicate termination:

{code}
// And send a syncrhonous final message to indicate termination.
message.clearData();
message.appendData(Encoding.UTF8.GetBytes("That's    
  session.messageSubscribe("message_queue");
  // Receive messages until all messages are received
  Monitor.Wait(session);
}all, folks!"));
session.messageTransfer("amq.direct", "routing_key", message); 
session.sync();
{code}

h2. Writing Fanout Applications