...
Now we will examine the code for each of these programs. In each section, we will discuss only
the code that must be added to the skeleton shown in Section "Creating and Closing Sessions".
Consuming from a Fanout Exchange
The first program in the fanout example, Listener.cs, creates a private queue, binds it to the amq.fanout exchange, and waits for messages to arrive on the queue, printing them out as they arrive. It uses a Listener that is identical to the one used in the direct example:
Code Block |
---|
public class MessageListener : IMessageListener
{
private readonly ClientSession _session;
private readonly RangeSet _range = new RangeSet();
public MessageListener(ClientSession session)
{
_session = session;
}
public void messageTransfer(IMessage m)
{
BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
byte[] body = new byte[m.Body.Length - m.Body.Position];
reader.Read(body, 0, body.Length);
ASCIIEncoding enc = new ASCIIEncoding();
string message = enc.GetString(body);
Console.WriteLine("Message: " + message);
// Add this message to the list of message to be acknowledged
_range.add(m.Id);
if (message.Equals("That's all, folks!"))
{
// Acknowledge all the received messages
_session.messageAccept(_range);
lock (_session)
{
Monitor.Pulse(_session);
}
}
}
}
|
The listener creates a private queue to receive its messages and binds it to the fanout exchange:
Code Block |
---|
string myQueue = session.Name;
session.queueDeclare(myQueue, Option.EXCLUSIVE, Option.AUTO_DELETE);
session.exchangeBind(myQueue, "amq.fanout", "my-key");
|
Now we create a listener and subscribe it to the queue:
Code Block |
---|
lock (session)
{
Console.WriteLine("Listening");
// Create a listener and subscribe it to my queue.
IMessageListener listener = new MessageListener(session);
session.attachMessageListener(listener, myQueue);
session.messageSubscribe(myQueue);
// Receive messages until all messages are received
Monitor.Wait(session);
}
|
Publishing Messages to the Fanout Exchange
The second program in this example, Producer.cs, writes messages to the fanout queue.
Code Block |
---|
// Unlike topic exchanges and direct exchanges, a fanout
// exchange need not set a routing key.
IMessage message = new Message();
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
for (int i = 0; i < 10; i++)
{
message.clearData();
message.appendData(Encoding.UTF8.GetBytes("Message " + i));
session.messageTransfer("amq.fanout", message);
}
// And send a syncrhonous final message to indicate termination.
message.clearData();
message.appendData(Encoding.UTF8.GetBytes("That's all, folks!"));
session.messageTransfer("amq.fanout", message);
session.sync();
|