Info |
---|
This is a tutorial about how to create a new eagle application step by step, though it is using cassandra query monitoring as example, but it could be extended to any log-based monitoring cases. |
Table of Contents
1. Flow Stream into Kafka
Raw Query Log Sample
...
:
Code Block | ||
---|---|---|
| ||
host:/192.168.6.233|source:/192.168.6.227|user:cassandra|timestamp:1454965365762|category:QUERY|type:CQL_SELECT|ks:dg_keyspace|cf:customer_details|operation:Select id,name,email,contact from customer_details|masked_columns:[email,contact]|other_columns:[id,name] |
...
Create Kafka topic: cassandra_querylog_sandbox
Code Block | |||
---|---|---|---|
| |||
$KAFKA_HOME/bin/kafka-topics.sh --create --topic cassandra_querylog_sandbox --zookeeper localhost:2181 --partitions 1 --replication-factor 1 |
Parsed Query Log and Flow into Kafka
Code Block | ||
---|---|---|
| ||
{ "host": "/192.168.6.227"{ "host": "/192.168.6.227", "source": "/192.168.6.227", "user": "jaspa", "timestamp": 1455574202864, "category": "QUERY", "type": "CQL_SELECT", "ks": "dg_keyspace", "cf": "customer_details", "operation": "CQL_SELECT", "masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url", "other_columns": "id|npi" } |
...
2.
...
Startup Eagle Server
Code Block | ||
---|---|---|
| ||
$EAGLE_HOME/bin/eagle-service.sh start |
3. Create New Monitoring Application
- Site: sandbox
- Application:
- Group: Cassandra
- Name: cassandraQueryLog
- Source Stream:
- Name: cassandraQueryLogStream
- Executor: cassandraQueryLogExecutor
- Schema:
cf | string | query column family |
ks | string | query keyspace |
host | string | the host that current metric comes form |
type | string | query type |
user | string | query user |
category | string | query category |
timestamp | long | query timestamp |
masked_columns | string | query masked_columns |
operation | string | query operation |
other_columns | string | query other_columns |
source | string | source host |
Here is the script for defining the metadata
Code Block | ||||
---|---|---|---|---|
| ||||
# EAGLE_SERVICE_HOST, default is `hostname -f` export EAGLE_SERVICE_HOST=localhost # EAGLE_SERVICE_PORT, default is 9099 export EAGLE_SERVICE_PORT=9099 # EAGLE_SERVICE_USER export EAGLE_SERVICE_USER=admin # EAGLE_SERVICE_PASSWORD export EAGLE_SERVICE_PASSWD=secret curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \ -d ' [ { "tags":{ "site":"sandbox", "application":"cassandraQueryLog" }, "enabled": true, "config": "{}" } ] ' curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \ -d ' [ { "tags":{ "application":"cassandraQueryLog" }, "description":"cassandra Query Log Monitoring", "alias":"QueryLogMonitor", "groupName":"Cassandra", "config":"{}", "features":["common","metadata"] } ] ' ## AlertStreamService echo "" echo "Importing AlertStreamService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ -d ' [ { "tags":{ "application":"cassandraQueryLog", "streamName":"cassandraQueryLogStream" }, "description":"cassandra query log data source stream" } ] ' ## AlertExecutorService: what alert streams are consumed by alert executor echo "" echo "Importing AlertExecutorService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \ -d ' [ { "tags":{ "application":"cassandraQueryLog", "alertExecutorId":"cassandraQueryLogExecutor", "streamName":"cassandraQueryLogStream" }, "description":"executor for cassandra query log stream" } ] ' ## AlertStreamSchemaService: schema for event from alert stream echo "" echo "Importing AlertStreamSchemaService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \ -d ' [ { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "host" }, "attrDescription": "the host that current metric comes form", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "source" }, "attrDescription": "source host", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "user" }, "attrDescription": "query user", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "timestamp" }, "attrDescription": "query timestamp", "attrType": "long", "category": "", "attrValueResolver": "" }, { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "category" }, "attrDescription": "query category", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \ -d ' [ { "tagsapplication":{ "cassandraQueryLog", "site":"sandbox", "streamName": "cassandraQueryLogStream", "attrName": "type" }, "applicationattrDescription": "cassandraQueryLogMonitorquery type", }"attrType": "string", "enabledcategory": true"", "configattrValueResolver": "{}" }, ] { ' curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \ -d ' [ { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "ks" }, "tagsattrDescription":{ "query keyspace", "applicationattrType": "cassandraQueryLogMonitorstring", }"category": "", "descattrValueResolver": ""cassandra Query Log Monitoring"}, { "aliastags":"QueryLogMonitor", { "groupapplication": "CassandracassandraQueryLog", "configstreamName": "{}cassandraQueryLogStream", "featuresattrName":["common","metadata"] "cf" } ] ' ## AlertStreamService: alert streams generated from data source echo "" echo "Importing AlertStreamService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ -d ' [}, "attrDescription": "query column family", "attrType": "string", "category": "", "attrValueResolver": "" }, { "prefixtags":"alertStream", { "tagsapplication":{ "cassandraQueryLog", "dataSourcestreamName": "hadoopJmxMetricDataSourcecassandraQueryLogStream", "streamNameattrName": "hadoopJmxMetricEventStreamoperation" }, "attrDescription": "query operation", "descattrType": "hadoopstring", } ] ' ## AlertExecutorService: what alert streams are consumed by alert executor echo "category": "", "attrValueResolver": "" echo "Importing AlertExecutorService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \ -d ' [ {}, { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "prefixattrName": "alertExecutor"masked_columns" }, "tagsattrDescription":{ "query masked_columns", "dataSourceattrType": "hadoopJmxMetricDataSourcestring", "alertExecutorIdcategory": "hadoopJmxMetricAlertExecutor", "streamNameattrValueResolver": "hadoopJmxMetricEventStream" }, },{ "desctags":"aggregate executor for hadoop jmx metric event stream" } ] ' ## AlertStreamSchemaService: schema for event from alert stream echo "" echo "Importing AlertStreamSchemaService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \ -d ' [ { { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "other_columns" }, "attrDescription": "query other_columns", "attrType": "string", "prefixcategory": "alertStreamSchema", "tagsattrValueResolver": { "" } ] ' ## Finished echo "" echo "Finished initialization for "dataSource": "hadoopJmxMetricDataSource", "streamName": "hadoopJmxMetricEventStream", "attrName": "host" }, "attrDescription": "the host that current metric comes form", "attrType": "string", "category": "", "attrValueResolver": "" }, { "prefix": "alertStreamSchema", "tags": { "dataSource": "hadoopJmxMetricDataSource", "streamName": "hadoopJmxMetricEventStream", "attrName": "timestamp" }, "attrDescription": "the metric timestamp", "attrType": "long", "category": "", "attrValueResolver": "" }, { "prefix": "alertStreamSchema", "tagseagle topology" |
After successfully loading above metadata, the new application of Cassandra Query Monitoring Application could be found in Eagle Web after refreshing the page as following:
4. Start Monitoring Topology
Create a new topology configuration file named "cassandra-querylog-sandbox.conf" to connect source kafka topic: cassandra_querylog_sandbox as following:
Code Block language js title cassandra-querylog-sandbox.conf # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. { "envContextConfig" : { "env" : "storm", "mode" : "cluster", "topologyName" : "cassandrawQueryLogMonitorTopology", "stormConfigFile" : "cassandraw-querlog-storm.yaml", "parallelismConfig" : { "cassandraQueryLogStream" : 1, "cassandraQueryLogExecutor*" : 1 } }, "dataSourceConfig": { "topic" : "cassandra_querylog_sandbox", "
...
zkConnection" : "
...
sandbox.hortonworks.com:2181", "zkConnectionTimeoutMS" :
...
15000, "
...
consumerGroupId" : "
...
eagle.consumer", "fetchSize" : 1048586, "
...
deserializerClass" : "
...
org.apache.eagle.datastream.storm.JsonMessageDeserializer", "
...
transactionZKServers" : "
...
sandbox.hortonworks.com",
...
"
...
transactionZKPort" :
...
2181,
...
"
...
transactionZKRoot" : "/consumers",
...
"
...
transactionStateUpdateMS" :
...
2000
...
}, "alertExecutorConfigs" : {
...
"
...
cassandraQueryLogExecutor" :
...
{ "
...
parallelism" :
...
1, "partitioner"
...
: "org.apache.eagle.policy.DefaultPolicyPartitioner" "
...
needValidation" : "
...
true"
...
}
...
}, "
...
eagleProps" :
...
{ "site" :
...
"sandbox",
...
"
...
application": "
...
cassandraQueryLog",
...
...
...
"dataJoinPollIntervalSec"
...
: 30,
...
"
...
mailHost" : "
...
mailHost.com", "mailSmtpPort":"25", "
...
mailDebug" : "true", "balancePartitionEnabled" : true, #"
...
partitionRefreshIntervalInMin" :
...
60, #"kafkaStatisticRangeInMin" :
...
60, "eagleService": {
...
"
...
host": "
...
localhost",
...
"
...
port":
...
9099, "
...
username": "
...
admin",
...
...
"
...
password": "
...
secret"
...
...
} "
...
readHdfsUserCommandPatternFrom" : "
...
file" }, "dynamicConfigSource" : {
...
"enabled" : true, "
...
initDelayMillis" :
...
...
0, "delayMillis" : 30000 } }
Start monitoring topology
Code Block language bash ./bin/kafka-stream-monitor.sh cassandraQueryLogStream cassandraQueryLogExecutor cassandra-querylog-sandbox.conf
5. Validate with Sample Policy
a. Define sample policy with eagle
Code Block | ||
---|---|---|
| ||
from cassandraQueryLogStream[(ks == 'dg_keyspace') and (cf == 'customer_details')] select * insert into outputStream; |
b. Trigger alert with sample event
Code Block | ||
---|---|---|
| ||
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic cassandra_querylog_sandbox --broker-list sandbox.hortonworks.com:6667
{"host": "/192.168.6.227","source": "/192.168.6.227","user": "jaspa", "timestamp": 1455574202864, "category": "QUERY", "type": "CQL_SELECT", "ks": "dg_keyspace","cf": "customer_details","operation": "CQL_SELECT","masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url","other_columns": "id|npi"} |
c. Review generated alert