Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block

#include <qpid/client/Connection.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/QueueOptions.h>

#include <iostream>

using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;

struct  Args : public qpid::Options,
               public qpid::client::ConnectionSettings
{
    bool help;

    Args() : qpid::Options("Simple latency test optins"), help(false)
    {
        using namespace qpid;
        addOptions()
            ("help", optValue(help), "Print this usage statement")
            ("broker,b", optValue(host, "HOST"), "Broker host to connect to") 
            ("port,p", optValue(port, "PORT"), "Broker port to connect to")
            ("username", optValue(username, "USER"), "user name for broker log in.")
            ("password", optValue(password, "PASSWORD"), "password for broker log in.")
            ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.")
            ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay");
    }
};

class Listener : public MessageListener
{
  private:
    Session session;
    SubscriptionManager subscriptions;
    std::string queue;
    Message request;
    QueueOptions args;
  public:
    Listener(Session& session);
    void setup();
    void send(std::string kv);
    void received(Message& message);
    void start() {subscriptions.run();};; 
};

Listener::Listener(Session& s) :
    session(s), subscriptions(s),
    queue(session.getId().getName())
{}

void Listener::setup()
{
    // set queue mode
    	args.setOrdering(LVQ);

    session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
    request.getDeliveryProperties().setRoutingKey(queue);

}

void Listener::start()
{
    subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));    
    subscriptions.run();
}

void Listener::send(std::string kv)
{

    std::string key;
	args.getLVQKey(key);
    request.getHeaders().setString(key, kv);

    request.setData( kv);
    
    cout << "SendingSening Data:" << kv << std::endl;
    async(session).messageTransfer(arg::content=request);
    
}

void Listener::received(Message& response) 
{

    cout << "Receiving Data:" << response.getData() << std::endl;
    if (response.getData() == "last"){
        subscriptions.cancel(queue);
    }
}

int main(int argc, char** argv) 
{
    Args opts;
    opts.parse(argc, argv);

    if (opts.help) {
        std::cout << opts << std::endl;
        return 0;
    }

    Connection connection;
    try {
        connection.open(opts);
        Session session = connection.newSession();
        Listener listener(session);
        listener.setup();
        listener.send("key1");
        listener.send("key2");
        listener.send("key3");
        listener.send("key1");
        listener.send("last");
        listener.start();
        

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
    }
    return 1;
}