Notification mechanism is introduced in Hcatalog so that a user can be notified of certain events occurring in Metastore if she desires so. Events are of six types: add_database, add_table, add_partition, drop_partition, drop_table, drop_database. When any of these events occur on Metastore message is sent to a Message Bus. Any user who wish to get notification for these messages can subscribe to a message bus. Once subscribed, message bus will deliver the messages to the subscriber.
To start receiving message you first need to create a connection to messagebus as demonstrated below:
ConnectionFactory connFac = new ActiveMQConnectionFactory(amqurl); Connection conn = connFac.createConnection(); conn.start();
Then subscribe to a topic you are interested in. While subscribing on a message bus, user need to subscribe to a particular topic to receive the messages which are being delivered on that topic. The topic name corresponding to a particular table is stored in table properties and can be retrieved using following piece of code:
HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); String topicName = msc.getTable("mydb", "myTbl").getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
Use the topic name to subscribe to a topic as follows:
Session session = conn.createSession(true, Session.SESSION_TRANSACTED); Destination hcatTopic = session.createTopic(topicName); MessageConsumer consumer = session.createConsumer(hcatTopic); consumer.setMessageListener(this);
To start receiving messages user need to implement a jms interface MessageListener which will make you implement a method onMessage(Message msg). This method will be called whenever a new message arrives on MessageBus. The message contains a partition object representing the corresponding partition, which can be retrieved as following:
@Override public void onMessage(Message msg){ // We are interested in only add_partition events on this table. // So, check message type first. if(msg.getStringProperty(HCatConstants.HCAT_EVENT).equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){ // Retrieve HCatEventMessage, using MessagingUtils. HCatEventMessage hcatMessage = MessagingUtils.getMessage(msg); //Get the partition-keys for all partitions added. List<Map<String, String>> partitionList = ((AddPartitionMessage)hcatMessage).getPartitions(); } }
You need to have jms jar in your classpath to make this work. You additionally need to have a jms provider’s jar in your classpath as well. Hcatalog uses ActiveMQ as a jms provider. In principle any JMS provider can be used in client side. However, ActiveMQ is recommended. It can be obtained from: http://activemq.apache.org/activemq-550-release.html
Event Message Formats
While HCatalog Event-string formats are pluggable, the strings are in JSON by default. Each event conveys only just enough information to identify the Database/Table/Partition that's been added/deleted in HCatalog. An event-consumer may use the identifiers specified in the event to query HCatalog for further information.
HCatalog sends events for 6 metastore operations. Here's a listing of the supported events, and their corresponding event formats:
1. Creation of Database:
Event type-string: "CREATE_DATABASE"
Example JSON Format:
{ "timestamp" : 1360272556, "eventType" : "CREATE_DATABASE", "server" : "hcatserver.mydomain.net", "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET", "db" : "mydb" }
2. Dropping a Database:
Event type-string: "DROP_DATABASE"
Example JSON Format:
{ "timestamp" : 1360272556, "eventType" : "DROP_DATABASE", "server" : "hcatserver.mydomain.net", "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET", "db" : "mydb" }
3. Creation of a Table:
Event type-string: "CREATE_TABLE"
Example JSON Format:
{ "timestamp" : 1360272556, "eventType" : "CREATE_TABLE", "server" : "hcatserver.mydomain.net", "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET", "db" : "mydb", "table" : "mytbl" }
4. Dropping a Table:
Event type-string: "DROP_TABLE"
Example JSON Format:
{ "timestamp" : 1360272556, "eventType" : "DROP_TABLE", "server" : "hcatserver.mydomain.net", "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET", "db" : "mydb", "table" : "mytbl" }
5. Adding (an atomic set of) partitions:
Event type-string: "ADD_PARTITION"
Example JSON Format:
{ "timestamp" : 1360272556, "eventType" : "ADD_PARTITION", "server" : "hcatserver.mydomain.net", "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET", "db" : "mydb", "table" : "mytbl", "partitions": [ { "partKey1" : "partVal1A", "partKey2" : "partVal2A" }, { "partKey1" : "partVal1B", "partKey2" : "partVal2B" }, { "partKey1" : "partVal1C", "partKey2" : "partVal2C" } ] }
6. Dropping (a set of) partitions:
Event type-string: "DROP_PARTITION"
Example JSON Format:
{ "timestamp" : 1360272556, "eventType" : "DROP_PARTITION", "server" : "hcatserver.mydomain.net", "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET", "db" : "mydb", "table" : "mytbl", "partitions": [ { "partKey1" : "partVal1A", "partKey2" : "partVal2A" }, { "partKey1" : "partVal1B", "partKey2" : "partVal2B" }, { "partKey1" : "partVal1C", "partKey2" : "partVal2C" } ] }
All the JMS messages are sent as TextMessage instances. Apart from the message-body, each message conveys 3 string properties, using the following keys:
- HCatConstants.HCAT_EVENT: The event-type string (E.g. "CREATE_TABLE", "ADD_PARTITIONS", etc.)
- HCatConstants.HCAT_MESSAGE_VERSION: The version-string for the messages (E.g. "0.1", etc.)
- HCatConstants.HCAT_FORMAT: An identifier for the message format (E.g. "json", by default.)