Info |
---|
This is a tutorial about how to create a new eagle application for monitoring cassandra query log step by stepstep by step, though it is using cassandra query monitoring as example, but it could be extended to any log-based monitoring cases. |
Table of Contents
1. Flow
...
Stream into
...
Kafka
Raw Query Log Sample:
Code Block | ||
---|---|---|
| ||
host:/192.168.6.233|source:/192.168.6.227|user:cassandra|timestamp:1454965365762|category:QUERY|type:CQL_SELECT|ks:dg_keyspace|cf:customer_details|operation:Select id,name,email,contact from customer_details|masked_columns:[email,contact]|other_columns:[id,name] |
...
Code Block | ||
---|---|---|
| ||
{ "host": "/192.168.6.227", "source": "/192.168.6.227", "user": "jaspa", "timestamp": 1455574202864, "category": "QUERY", "type": "CQL_SELECT", "ks": "dg_keyspace", "cf": "customer_details", "operation": "CQL_SELECT", "masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url", "other_columns": "id|npi" } |
2. Startup Eagle
...
Server
Code Block | ||
---|---|---|
| ||
$EAGLE_HOME/bin/eagle-service.sh start |
3. Create New Monitoring Application
- Site: sandbox
- Application:
- Group: Cassandra
- Name: cassandraQueryLog
- Source Stream:
- Name: cassandraQueryLogStream
- Executor: cassandraQueryLogExecutor
- Schema:
...
Code Block | ||||
---|---|---|---|---|
| ||||
# EAGLE_SERVICE_HOST, default is `hostname -f` export EAGLE_SERVICE_HOST=localhost # EAGLE_SERVICE_PORT, default is 9099 export EAGLE_SERVICE_PORT=9099 # EAGLE_SERVICE_USER export EAGLE_SERVICE_USER=admin # EAGLE_SERVICE_PASSWORD export EAGLE_SERVICE_PASSWD=secret curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \ -d ' [ { "tags":{ "site":"sandbox", "application":"cassandraQueryLog" }, "enabled": true, "config": "{}" } ] ' curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \ -d ' [ { "tags":{ "application":"cassandraQueryLog" }, "descdescription":"cassandra Query Log Monitoring", "alias":"QueryLogMonitor", "groupgroupName":"Cassandra", "config":"{}", "features":["common","metadata"] } ] ' ## AlertStreamService echo "" echo "Importing AlertStreamService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ -d ' [ { "tags":{ "dataSourceapplication":"cassandraQueryLog", "streamName":"cassandraQueryLogStream" }, "descdescription":"cassandra query log data source stream" } ] ' ## AlertExecutorService: what alert streams are consumed by alert executor echo "" echo "Importing AlertExecutorService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \ -d ' [ { "tags":{ "dataSourceapplication":"cassandraQueryLog", "alertExecutorId":"cassandraQueryLogExecutor", "streamName":"cassandraQueryLogStream" }, "descdescription":"executor for cassandra query log stream" } ] ' ## AlertStreamSchemaService: schema for event from alert stream echo "" echo "Importing AlertStreamSchemaService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \ -d ' [ { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "host" }, "attrDescription": "the host that current metric comes form", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "source" }, "attrDescription": "source host", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "user" }, "attrDescription": "query user", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "timestamp" }, "attrDescription": "query timestamp", "attrType": "long", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "category" }, "attrDescription": "query category", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "type" }, "attrDescription": "query type", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "ks" }, "attrDescription": "query keyspace", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "cf" }, "attrDescription": "query column family", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "operation" }, "attrDescription": "query operation", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "masked_columns" }, "attrDescription": "query masked_columns", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "other_columns" }, "attrDescription": "query other_columns", "attrType": "string", "category": "", "attrValueResolver": "" } ] ' ## Finished echo "" echo "Finished initialization for eagle topology" |
...
After successfully loading above metadata, the new application of Cassandra Query Monitoring Application could be found in Eagle Web after refreshing the page as following:
4. Start Monitoring Topology
Create a new topology configuration file named "cassandra-querylog-sandbox.conf" to connect source kafka topic: cassandra_querylog_sandbox as following:
Code Block language js title cassandra-querylog-sandbox.conf # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. { "envContextConfig" : { "env" : "storm", "mode" : "cluster", "topologyName" : "cassandrawQueryLogMonitorTopology", "stormConfigFile" : "cassandraw-querlog-storm.yaml", "parallelismConfig" : { "cassandraQueryLogStream" : 1, "cassandraQueryLogExecutor*" : 1 } }, "dataSourceConfig": { "topic" : "cassandra_querylog_sandbox", "zkConnection" : "sandbox.hortonworks.com:2181", "zkConnectionTimeoutMS" : 15000, "consumerGroupId" : "eagle.consumer", "fetchSize" : 1048586, "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer", "transactionZKServers" : "sandbox.hortonworks.com", "transactionZKPort" : 2181, "transactionZKRoot" : "/consumers", "transactionStateUpdateMS" : 2000 }, "alertExecutorConfigs" : { "cassandraQueryLogExecutor" : { "parallelism" : 1, "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" "needValidation" : "true" } }, "eagleProps" : { "site" : "sandbox", "dataSourceapplication": "cassandraQueryLog", "dataJoinPollIntervalSec" : 30, "mailHost" : "mailHost.com", "mailSmtpPort":"25", "mailDebug" : "true", "balancePartitionEnabled" : true, #"partitionRefreshIntervalInMin" : 60, #"kafkaStatisticRangeInMin" : 60, "eagleService": { "host": "localhost", "port": 9099, "username": "admin", "password": "secret" } "readHdfsUserCommandPatternFrom" : "file" }, "dynamicConfigSource" : { "enabled" : true, "initDelayMillis" : 0, "delayMillis" : 30000 } }
Start monitoring topology
Code Block language bash ./bin/kafka-stream-monitor.sh cassandraQueryLogStream cassandraQueryLogExecutor cassandra-querylog-sandbox.conf
5. Validate with Sample Policy
a. Define sample policy with eagle
Code Block | ||
---|---|---|
| ||
from cassandraQueryLogStream[(ks == 'dg_keyspace') and (cf == 'customer_details')] select * insert into outputStream; |
b. Trigger alert with sample event
Code Block | ||
---|---|---|
| ||
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic cassandra_querylog_sandbox --broker-list sandbox.hortonworks.com:6667 {"host": "/192.168.6.227","source": "/192.168.6.227","user": "jaspa", "timestamp": 1455574202864, "category": "QUERY", "type": "CQL_SELECT", "ks": "dg_keyspace","cf": "customer_details","operation": "CQL_SELECT","masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url","other_columns": "id|npi"} |
c. Review generated alert