Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Tutorial

This tutorial consists of a series of examples using the three most commonly used exchange types - Direct, Fanout and Topic
exchanges. These examples show how to write applications that use the most common messaging paradigms.

  • direct
    In the direct examples, a message producer writes to the direct exchange, specifying a routing key. A message consumer reads messages from a named queue. This illustrates clean separation of concerns - message producers need to know only the exchange and the routing key, message consumers need to know only which queue to use on the broker.
  • fanout
    The fanout examples use a fanout exchange and do not use routing keys. Each binding specifies that all messages for a given exchange should be delivered to a given queue.
  • pub-sub
    In the publish/subscribe examples, a publisher application writes messages to an exchange, specifying a multi-part key. A subscriber application subscribes to messages that match the relevant parts of these keys, using a private queue for each subscription.
  • request-response
    In the request/response examples, a simple service accepts requests from clients and sends responses back to them. Clients create their own private queues and corresponding routing keys. When a client sends a request to the server, it specifies its own routing key in the reply-to field of the request. The server uses the client's reply-to field as the routing key for the response.

Running the Examples

Before running the examples, you need to unzip the file

...

Qpid.NET-net-2.0-M4.zip,

...

the

...

following

...

tree

...

is

...

created:

{
Code Block
}
<home>
  |-qpid
     |-lib (contains the required dlls)
     |-examples
          |- direct
          |    |-example-direct-Listener.exe
          |    |-example-direct-Producer.exe
          |- fanout
          |    |-example-fanout-Listener.exe
          |    |-example-fanout-Producer.exe
          |- pub-sub
          |    |-example-pub-sub-Listener.exe
          |    |-example-pub-sub-Publisher.exe
          |- request-response
               |-example-request-response-Client.exe
               |-example-request-response-Server.exe
{code}

Make

...

sure

...

your

...

PATH

...

contains

...

the

...

directory

...

<home>/qpid/lib

...


The

...

examples

...

can

...

be

...

run

...

by

...

executing

...

the

...

provided

...

exe

...

files:

{
Code Block
}
$ cd <home>/qpid/examples/examplefolder
$ example-...-.exe [hostname] [portnumber]

where hostname is the qpid broker host name (default is localhost) and portnumber is the port number on which the qpid broker is accepting connection (default is 5672).

Creating and Closing Sessions

All of the examples have been written using the Apache Qpid .NEt 0.10 API. The examples use the same skeleton code to initialize the program, create a session, and clean up before exiting:

Code Block
{code}
where [hostname] is the qpid broker host name (default is localhost) and [portnumber] is the port number on which the qpid broker is accepting connection (default is 5672). 

h2. Creating and Closing Sessions

All of the examples have been written using the Apache Qpid .NEt 0.10 API. The examples use the same skeleton code to initialize the program, create a session, and clean up before exiting:

{code}
using System;
using System.IO;
using System.Text;
using System.Threading;
using org.apache.qpid.client;
using org.apache.qpid.transport;

...

        private static void Main(string[] args)
        {
            string host = args.Length > 0 ? args[0] : "localhost";
            int port = args.Length > 1 ? Convert.ToInt32(args[1]) : 5672;
            Client connection = new Client();
            try
            {
                connection.connect(host, port, "test", "guest", "guest");
                ClientSession session = connection.createSession(50000);

                //--------- Main body of program --------------------------------------------

                connection.close();
            }
            catch (Exception e)
            {
                Console.WriteLine("Error: \n" + e.StackTrace);
            }
        }
...
{code}

h2. Writing Direct Applications

This section describes two programs that implement direct messaging using a Direct exchange:
• 

Writing Direct Applications

This section describes two programs that implement direct messaging using a Direct exchange:
org.apache.qpid.example.direct.Producer

...

(from

...

example-direct-producer)

...

publishes

...

messages

...

to

...

the

...

amq.direct

...

exchange,

...

using

...

the

...

routing

...

key

...

routing_key.

...


•org.apache.qpid.example.direct.Listener

...

(from

...

example-direct-Listener)

...

uses

...

a

...

message

...

listener

...

to

...

receive

...

messages

...

from

...

the

...

queue

...

named

...

message_queue.

...

Running

...

the

...

Direct

...

Examples

...

1)

...

Make

...

sure

...

your

...

PATH

...

contains

...

the

...

directory

...

<home>/qpid/lib

...

2)

...

Make

...

sure

...

that

...

a

...

qpid

...

broker

...

is

...

running:

{
Code Block
}
$ ps -eaf | grep qpidd
{code}

If

...

a

...

broker

...

is

...

running,

...

you

...

should

...

see

...

the

...

qpidd

...

process

...

in

...

the

...

output

...

of

...

the

...

above

...


command.

...

3)

...

Read

...

the

...

messages

...

from

...

the

...

message

...

queue

...

using

...

direct

...

listener,

...

as

...

follows:

{
Code Block
}
$ cd <home>/qpid/examples/direct
{code}

With

...

cygwin:

{
Code Block
}
$ ./example-direct-Listener.exe [hostname] [portnumber]
{code}
or with 

or with mono:

{
Code Block
}
$ mono ./example-direct-Listener.exe [hostname] [portnumber]
{code}

This

...

program

...

is

...

waiting

...

for

...

messages

...

to

...

be

...

published,

...

sex

...

next

...

step:

...

4)

...

Publish

...

a

...

series

...

of

...

messages

...

to

...

the

...

amq.direct

...

exchange

...

by

...

running

...

direct

...

producer,

...

as

...

follows:

{
Code Block
}
$ cd <home>/qpid/examples/direct
{code}

With

...

cygwin:

Code Block
 
{code}
$ ./example-direct-Producer.exe  [hostname] [portnumber]
{code}

or

...

with

...

mono:

{
Code Block
}
$ mono ./example-direct-Producer.exe [hostname] [portnumber]
{code}

This

...

program

...

has

...

no

...

output;

...

the

...

messages

...

are

...

routed

...

to

...

the

...

message

...

queue,

...

as

...

instructed

...

by

...

the

...

binding.

5) Go to the windows where you are running your listener. You should see the following output:

Code Block


On the direct listener console, you should see the following output:
{code}
Message: Message 0
Message: Message 1
Message: Message 2
Message: Message 3
Message: Message 4
Message: Message 5
Message: Message 6
Message: Message 7
Message: Message 8
Message: Message 9
Message: That's all, folks!
{code}

Now

...

we

...

will

...

examine

...

the

...

code

...

for

...

each

...

of

...

these

...

programs.

...

In

...

each

...

section,

...

we

...

will

...

discuss

...

only

...


the

...

code

...

that

...

must

...

be

...

added

...

to

...

the

...

skeleton

...

shown

...

in

...

Section

...

"Creating

...

and

...

Closing

...

Sessions".

...

Reading Messages from the Queue

The program , listener.cs,

...

is

...

a

...

message

...

listener

...

that

...

receives

...

messages

...

from

...

a

...

queue.

...

First

...

it

...

creates

...

a

...

queue

...

named

...

message_queue,

...

then

...

binds

...

it

...

to

...

the

...

amq.direct

...

exchange

...

using

...

the

...

binding

...

key

...

routing_key.

{
Code Block
}
//--------- Main body of program --------------------------------------------
// Create a queue named "message_queue", and route all messages whose
// routing key is "routing_key" to this newly created queue.
session.queueDeclare("message_queue");
session.exchangeBind("message_queue", "amq.direct", "routing_key");    
{code}

The

...

queue

...

created

...

by

...

this

...

program

...

continues

...

to

...

exist

...

after

...

the

...

program

...

exits,

...

and

...

any

...

message

...

whose

...

routing

...

key

...

matches

...

the

...

key

...

specified

...

in

...

the

...

binding

...

will

...

be

...

routed

...

to

...

the

...

corresponding

...

queue

...

by

...

the

...

broker.

...

Note

...

that

...

the

...

queue

...

could

...

have

...

been

...

be

...

deleted

...

using

...

the

...

following

...

code:

Code Block
 

{code}
session.queueDelete("message_queue");
{code}

To

...

create

...

a

...

message

...

listener,

...

create

...

a

...

class

...

derived

...

from

...

IMessageListener,

...

and

...

override

...

the

...

messageTransfer

...

method,

...

providing

...

the

...

code

...

that

...

should

...

be

...

executed

...

when

...

a

...

message

...

is

...

received.

Code Block
 

public class MessageListener : IMessageListener
{
  ......
  public void messageTransfer(IMessage m)
  {
  .....
}
{code}

The

...

main

...

body

...

of

...

the

...

program

...

creates

...

a

...

listener

...

for

...

the

...

subscription;

...

attaches

...

the

...

listener

...

to

...

a

...

message

...

queue;

...

and

...


subscribe

...

to

...

the

...

queue

...

to

...

receive

...

messages

...

from

...

the

...

queue.

{
Code Block
}
lock (session)
{
  // Create a listener and subscribe it to the queue named "message_queue"
  IMessageListener listener = new MessageListener(session);
  session.attachMessageListener(listener, "message_queue");                              
  session.messageSubscribe("message_queue");
  // Receive messages until all messages are received
  Monitor.Wait(session);
}
{code}

The

...

MessageListener's

...

messageTransfer()

...

function

...

is

...

called

...

whenever

...

a

...

message

...

is

...

received.

...

In

...

this

...

example

...

the

...

message

...

is

...

printed

...

and

...

tested

...

to

...

see

...

if

...

it

...

is

...

the

...

final

...

message.

...

Once

...

the

...

final

...

message

...

is

...

received,

...

the

...

messages

...

are

...

acknowledged.

Code Block
 

{code}
BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
byte[] body = new byte[m.Body.Length - m.Body.Position];
reader.Read(body, 0, body.Length);
ASCIIEncoding enc = new ASCIIEncoding();
string message = enc.GetString(body);
 Console.WriteLine("Message: " + message);
// Add this message to the list of message to be acknowledged 
_range.add(m.Id);       
if( message.Equals("That's all, folks!") )
{
  // Acknowledge all the received messages 
  _session.messageAccept(_range);     
  lock(_session)
  {
      Monitor.Pulse(_session);
  }
}
{code}

h3. Publishing Messages to a Direct Exchange

The second program in the direct example, 

Publishing Messages to a Direct Exchange

The second program in the direct example, Producer.cs,

...

publishes

...

messages

...

to

...

the

...

amq.direct

...

exchange

...

using

...

the

...

routing

...

key

...

routing_key.

...

First,

...

create

...

a

...

message

...

and

...

set

...

a

...

routing

...

key.

...

The

...

same

...

routing

...

key

...

will

...

be

...

used

...

for

...

each

...

message

...

we

...

send,

...

so

...

you

...

only

...

need

...

to

...

set

...

this

...

property

...

once.

Code Block
IMessage message =         
{code}
IMessage message = new new Message();
// The routing key is a message property. We will use the same
// routing key for each message, so we'll set this property
// just once. (In most simple cases, there is no need to set
// other message properties.)
message.DeliveryProperties.setRoutingKey("routing_key"); 
{code}

Now

...

send

...

some

...

messages:

{
Code Block
}
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
for (int i = 0; i < 10; i++)
{
  message.clearData();
  message.appendData(Encoding.UTF8.GetBytes("Message " + i));                  
  session.messageTransfer("amq.direct", message);                    
}
{code}

Send

...

a

...

final

...

synchronous

...

message

...

to

...

indicate

...

termination:

{
Code Block
}
// And send a syncrhonous final message to indicate termination.
message.clearData();
message.appendData(Encoding.UTF8.GetBytes("That's all, folks!"));
session.messageTransfer("amq.direct", "routing_key", message); 
session.sync();
{code}

h2. Writing Fanout Applications


Writing Fanout Applications

This section describes two programs that illustrate the use of a Fanout exchange.

  • Listener.cs makes a unique queue private for each instance of the listener, and binds that queue to the fanout exchange. All messages sent to the fanout exchange are delivered to each listener's queue.
  • Producer.cs publishes messages to the fanout exchange. It does not use a routing key, which is not needed by the fanout exchange.

Running the Fanout Examples

1) Make sure your PATH contains the directory <home>/qpid/lib

2) Make sure that a qpid broker is running:

Code Block

$ ps -eaf | grep qpidd

If a broker is running, you should see the qpidd process in the output of the above
command.

3) In separate windows, start two or more fanout listeners as follows:

Code Block

$ cd <home>/qpid/examples/direct

With cygwin:

Code Block

$ ./example-fanout-Listener.exe [hostname] [portnumber]

or with mono:

Code Block

$ mono ./example-fanout-Listener.exe [hostname] [portnumber]

The listener creates a private queue, binds it to the amq.fanout exchange, and waits for messages to arrive on the queue. When the listener starts, you will see the following message:

Code Block

Listening

This program is waiting for messages to be published, sex next step:

4) In a separate window, publish a series of messages to the amq.fanout exchange by running fanout producer, as follows:

Code Block

$ cd <home>/qpid/examples/direct

With cygwin:

Code Block

$ ./example-fanout-Producer.exe  [hostname] [portnumber]

or with mono:

Code Block

$ mono ./example-fanout-Producer.exe [hostname] [portnumber]

This program has no output; the messages are routed to the message queue, as prescribed by the binding.

5) Go to the windows where you are running listeners. You should see the following output for each listener:

Code Block

Message: Message 0
Message: Message 1
Message: Message 2
Message: Message 3
Message: Message 4
Message: Message 5
Message: Message 6
Message: Message 7
Message: Message 8
Message: Message 9
Message: That's all, folks!

Now we will examine the code for each of these programs. In each section, we will discuss only
the code that must be added to the skeleton shown in Section "Creating and Closing Sessions".