You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Understanding LVQ

Last Value Queues are useful youUser Documentation are only interested in the latest value entered into a queue. LVQ semantics are typically used for things like stock symbol updates when all you care about is the latest value for example.

Qpid C++ M4 or later supports two types of LVQ semantics:

  • LVQ
  • LVQ_NO_BROWSE

LVQ semantics:

LVQ uses a header for a key, if the key matches it replaces the message in-place in the queue except
a.) if the message with the matching key has been acquired
b.) if the message with the matching key has been browsed
In these two cases the message is placed into the queue in FIFO, if another message with the same key is received it will the 'un-accessed' message with the same key will be replaced

These two exceptions protect the consumer from missing the last update where a consumer or browser accesses a message and an update comes with the same key.

[localhost tests]$ ./echotest --mode create_lvq
[localhost tests]$ ./echotest --mode write
Sending Data: key1=key1.0x7fffdf3f3180
Sending Data: key2=key2.0x7fffdf3f3180
Sending Data: key3=key3.0x7fffdf3f3180
Sending Data: key1=key1.0x7fffdf3f3180
Sending Data: last=last
[localhost tests]$ ./echotest --mode browse
Receiving Data:key1.0x7fffdf3f3180
Receiving Data:key2.0x7fffdf3f3180
Receiving Data:key3.0x7fffdf3f3180
Receiving Data:last
[localhost tests]$ ./echotest --mode write
Sending Data: key1=key1.0x7fffe4c7fa10
Sending Data: key2=key2.0x7fffe4c7fa10
Sending Data: key3=key3.0x7fffe4c7fa10
Sending Data: key1=key1.0x7fffe4c7fa10
Sending Data: last=last
[localhost tests]$ ./echotest --mode consume
Receiving Data:key1.0x7fffdf3f3180
Receiving Data:key2.0x7fffdf3f3180
Receiving Data:key3.0x7fffdf3f3180
Receiving Data:last
Receiving Data:key1.0x7fffe4c7fa10
Receiving Data:key2.0x7fffe4c7fa10
Receiving Data:key3.0x7fffe4c7fa10
Receiving Data:last

An example



h2. LVQ_NO_BROWSE semantics:

LVQ uses a header for a key, if the key matches it replaces the message in-place in the queue except
a.) if the message with the matching key has been acquired
In these two cases the message is placed into the queue in FIFO, if another message with the same key is received it will the 'un-accessed' message with the same key will be replaced

Note, in this case browsed messaged are not invalidated, so updates can be missed.

An example

[localhost tests]$ ./echotest --mode create_lvq_no_browse
[localhost tests]$ ./echotest --mode write
Sending Data: key1=key1.0x7fffce5fb390
Sending Data: key2=key2.0x7fffce5fb390
Sending Data: key3=key3.0x7fffce5fb390
Sending Data: key1=key1.0x7fffce5fb390
Sending Data: last=last
[localhost tests]$ ./echotest --mode write
Sending Data: key1=key1.0x7fff346ae440
Sending Data: key2=key2.0x7fff346ae440
Sending Data: key3=key3.0x7fff346ae440
Sending Data: key1=key1.0x7fff346ae440
Sending Data: last=last
[localhost tests]$ ./echotest --mode browse
Receiving Data:key1.0x7fff346ae440
Receiving Data:key2.0x7fff346ae440
Receiving Data:key3.0x7fff346ae440
Receiving Data:last
[localhost tests]$ ./echotest --mode browse
Receiving Data:key1.0x7fff346ae440
Receiving Data:key2.0x7fff346ae440
Receiving Data:key3.0x7fff346ae440
Receiving Data:last
[localhost tests]$ ./echotest --mode write
Sending Data: key1=key1.0x7fff606583e0
Sending Data: key2=key2.0x7fff606583e0
Sending Data: key3=key3.0x7fff606583e0
Sending Data: key1=key1.0x7fff606583e0
Sending Data: last=last
[localhost tests]$ ./echotest --mode consume
Receiving Data:key1.0x7fff606583e0
Receiving Data:key2.0x7fff606583e0
Receiving Data:key3.0x7fff606583e0
Receiving Data:last
[localhost tests]$


h2. Example source

/*
*

  • Licensed to the Apache Software Foundation (ASF) under one
  • or more contributor license agreements. See the NOTICE file
  • distributed with this work for additional information
  • regarding copyright ownership. The ASF licenses this file
  • to you under the Apache License, Version 2.0 (the
  • "License"); you may not use this file except in compliance
  • with the License. You may obtain a copy of the License at
  • http://www.apache.org/licenses/LICENSE-2.0
  • Unless required by applicable law or agreed to in writing,
  • software distributed under the License is distributed on an
  • "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  • KIND, either express or implied. See the License for the
  • specific language governing permissions and limitations
  • under the License.
    *
    */

#include <qpid/client/AsyncSession.h>
#include <qpid/client/Connection.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/QueueOptions.h>

#include <iostream>

using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid;
using namespace std;

enum Mode

Unknown macro: { CREATE_LVQ, CREATE_LVQ_NO_BROWSE, WRITE, BROWSE, CONSUME}

;
const char* modeNames[] =

Unknown macro: { "create_lvq","create_lvq_no_browse","write","browse","consume" }

;

// istream/ostream ops so Options can read/display Mode.
istream& operator>>(istream& in, Mode& mode) {
string s;
in >> s;
int i = find(modeNames, modeNames+5, s) - modeNames;
if (i >= 5) throw Exception("Invalid mode: "+s);
mode = Mode;
return in;
}

ostream& operator<<(ostream& out, Mode mode) {
return out << modeNames[mode];
}

struct Args : public qpid::Options,
public qpid::client::ConnectionSettings
{
bool help;
Mode mode;

Args() : qpid::Options("Simple latency test optins"), help(false), mode(BROWSE)

Unknown macro: { using namespace qpid; addOptions() ("help", optValue(help), "Print this usage statement") ("broker,b", optValue(host, "HOST"), "Broker host to connect to") ("port,p", optValue(port, "PORT"), "Broker port to connect to") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "PASSWORD"), "password for broker log in.") ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay") ("mode", optValue(mode, "'see below'"), "Action mode." "ncreate_lvq}

};

class Listener : public MessageListener

Unknown macro: { private}

void Listener::setup(bool browse)

Unknown macro: { // set queue mode args.setOrdering(browse?LVQ_NO_BROWSE}

void Listener::browse()
{
subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE, ACQUIRE_MODE_NOT_ACQUIRED));
subscriptions.run();
}

void Listener::consume()
{
subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED));
subscriptions.run();
}

void Listener::send(std::string kv)
{
request.getDeliveryProperties().setRoutingKey(queue);

std::string key;
args.getLVQKey(key);
request.getHeaders().setString(key, kv);

std::ostringstream data;
data << kv;
if (kv != "last") data << "." << hex << this;
request.setData(data.str());

cout << "Sending Data: " << kv << "=" << data.str() << std::endl;
async(session).messageTransfer(arg::content=request);

}

void Listener::received(Message& response)
{

cout << "Receiving Data:" << response.getData() << std::endl;
/* if (response.getData() == "last")

Unknown macro: { subscriptions.cancel(queue); }

*/
}

int main(int argc, char** argv)
{
Args opts;
opts.parse(argc, argv);

if (opts.help)

Unknown macro: { std}

Connection connection;
try {
connection.open(opts);
Session session = connection.newSession();
Listener listener(session);

switch (opts.mode)

Unknown macro: { case CONSUME}

connection.close();
return 0;
} catch(const std::exception& error)

Unknown macro: { std}

return 1;
}


  • No labels