...
Notification
...
mechanism
...
is
...
introduced
...
in
...
Hcatalog
...
so
...
that
...
a
...
user
...
can
...
be
...
notified
...
of
...
certain
...
events
...
occurring
...
in
...
Metastore
...
if
...
she
...
desires
...
so.
...
Events
...
are
...
of
...
six
...
types:
...
add_database,
...
add_table,
...
add_partition,
...
drop_partition,
...
drop_table,
...
drop_database.
...
When
...
any
...
of
...
these
...
events
...
occur
...
on
...
Metastore
...
message
...
is
...
sent
...
to
...
a
...
Message
...
Bus.
...
Any
...
user
...
who
...
wish
...
to
...
get
...
notification
...
for
...
these
...
messages
...
can
...
subscribe
...
to
...
a
...
message
...
bus.
...
Once
...
subscribed,
...
message
...
bus
...
will
...
deliver
...
the
...
messages
...
to
...
the
...
subscriber.
...
To
...
start
...
receiving
...
message
...
you
...
first
...
need
...
to
...
create
...
a
...
connection
...
to
...
messagebus
...
as
...
demonstrated
...
below:
Code Block |
---|
ConnectionFactory connFac = new ActiveMQConnectionFactory(amqurl); Connection conn = connFac.createConnection(); conn.start(); |
Then
...
subscribe
...
to
...
a
...
topic
...
you
...
are
...
interested
...
in.
...
While
...
subscribing
...
on
...
a
...
message
...
bus,
...
user
...
need
...
to
...
subscribe
...
to
...
a
...
particular
...
topic
...
to
...
receive
...
the
...
messages
...
which
...
are
...
being
...
delivered
...
on
...
that
...
topic.
...
The
...
topic
...
name
...
corresponding
...
to
...
a
...
particular
...
table
...
is
...
stored
...
in
...
table
...
properties
...
and
...
can
...
be
...
retrieved
...
using
...
following
...
piece
...
of
...
code:
Code Block |
---|
HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); String topicName = msc.getTable("mydb", "myTbl").getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); |
Use
...
the
...
topic
...
name
...
to
...
subscribe
...
to
...
a
...
topic
...
as
...
follows:
Code Block |
---|
Session session = conn.createSession(true, Session.SESSION_TRANSACTED); Destination hcatTopic = session.createTopic(topicName); MessageConsumer consumer = session.createConsumer(hcatTopic); consumer.setMessageListener(this); |
To
...
start
...
receiving
...
messages
...
user
...
need
...
to
...
implement
...
a
...
jms
...
interface
...
MessageListener
...
which
...
will
...
make
...
you
...
implement
...
a
...
method
...
onMessage(Message
...
msg).
...
This
...
method
...
will
...
be
...
called
...
whenever
...
a
...
new
...
message
...
arrives
...
on
...
MessageBus.
...
The
...
message
...
contains
...
a
...
partition
...
object
...
representing
...
the
...
corresponding
...
partition,
...
which
...
can
...
be
...
retrieved
...
as
...
following:
Code Block |
---|
@Override public void onMessage(Message @Override public void onMessage(Message msg) { Object obj = msg){ // We are interested in only add_partition events on this table. // So, check message type first. if(msg.getStringProperty(HCatConstants.HCAT_EVENT).equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){ //Get the actual partition object which got added. Partition partition = (Partition)(((ObjectMessage)msg).getObject()); } } |
You
...
need
...
to
...
have
...
jms
...
jar
...
in
...
your
...
classpath
...
to
...
make
...
this
...
work.
...
You
...
additionally
...
need
...
to
...
have
...
a
...
jms
...
provider’s
...
jar
...
in
...
your
...
classpath
...
as
...
well.
...
Hcatalog
...
uses
...
ActiveMQ
...
as
...
a
...
jms
...
provider.
...
In
...
principle
...
any
...
JMS
...
provider
...
can
...
be
...
used
...
in
...
client
...
side.
...
However,
...
ActiveMQ
...
is
...
recommended.
...
It
...
can
...
be
...
obtained
...
from:
...
http://activemq.apache.org/activemq-550-release.html
...