Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Now we will examine the code for each of these programs. In each section, we will discuss only
the code that must be added to the skeleton shown in Section "Creating and Closing Sessions".

Consuming from a Fanout Exchange

The first program in the fanout example, Listener.cs, creates a private queue, binds it to the amq.fanout exchange, and waits for messages to arrive on the queue, printing them out as they arrive. It uses a Listener that is identical to the one used in the direct example:

Code Block

  public class MessageListener : IMessageListener
    {
        private readonly ClientSession _session;
        private readonly RangeSet _range = new RangeSet();
        public MessageListener(ClientSession session)
        {
            _session = session;
        }

        public void messageTransfer(IMessage m)
        {
            BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
            byte[] body = new byte[m.Body.Length - m.Body.Position];
            reader.Read(body, 0, body.Length);
            ASCIIEncoding enc = new ASCIIEncoding();
            string message = enc.GetString(body);
            Console.WriteLine("Message: " + message);
            // Add this message to the list of message to be acknowledged 
            _range.add(m.Id);
            if (message.Equals("That's all, folks!"))
            {
                // Acknowledge all the received messages 
                _session.messageAccept(_range);
                lock (_session)
                {
                    Monitor.Pulse(_session);
                }
            }
        }
    }

The listener creates a private queue to receive its messages and binds it to the fanout exchange:

Code Block

string myQueue = session.Name;
session.queueDeclare(myQueue, Option.EXCLUSIVE, Option.AUTO_DELETE);
session.exchangeBind(myQueue, "amq.fanout", "my-key");

Now we create a listener and subscribe it to the queue:

Code Block

lock (session)
{
   Console.WriteLine("Listening");
   // Create a listener and subscribe it to my queue.
   IMessageListener listener = new MessageListener(session);
   session.attachMessageListener(listener, myQueue);
   session.messageSubscribe(myQueue);
   // Receive messages until all messages are received
   Monitor.Wait(session);
}

Publishing Messages to the Fanout Exchange

The second program in this example, Producer.cs, writes messages to the fanout queue.

Code Block

// Unlike topic exchanges and direct exchanges, a fanout
// exchange need not set a routing key. 
IMessage message = new Message();
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
for (int i = 0; i < 10; i++)
{
    message.clearData();
    message.appendData(Encoding.UTF8.GetBytes("Message " + i));
    session.messageTransfer("amq.fanout", message);
}

// And send a syncrhonous final message to indicate termination.
message.clearData();
message.appendData(Encoding.UTF8.GetBytes("That's all, folks!"));
session.messageTransfer("amq.fanout", message);
session.sync();

Writing Publish/Subscribe Applications