Info |
---|
This is a tutorial about how to create a new eagle application step by step, though it is using cassandra query monitoring as example, but it could be extended to any log-based monitoring cases. |
Table of Contents
1. Flow
...
Stream into
...
Kafka
Raw Query Log Sample:
Code Block | ||
---|---|---|
| ||
host:/192.168.6.233|source:/192.168.6.227|user:cassandra|timestamp:1454965365762|category:QUERY|type:CQL_SELECT|ks:dg_keyspace|cf:customer_details|operation:Select id,name,email,contact from customer_details|masked_columns:[email,contact]|other_columns:[id,name] |
Parsed Query Log and Flow into Kafka
Create Kafka topic: cassandra_querylog_sandbox
Code Block | ||
---|---|---|
| ||
$KAFKA_HOME/bin/kafka-topics.sh --create --topic cassandra_querylog_sandbox --zookeeper localhost:2181 --partitions 1 --replication-factor 1 |
Parsed Query Log and Flow into Kafka
Code Block | ||
---|---|---|
| ||
{
"host": "/192.168.6.227",
| ||
Code Block | ||
| ||
{
"host": "/192.168.6.227",
"source": "/192.168.6.227",
"user": "jaspa",
"timestamp": 1455574202864,
"category": "QUERY",
"type": "CQL_SELECT",
"ks": "dg_keyspace",
"cf": "customer_details",
"operation": "CQL_SELECT",
"masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url",
"other_columns": "id|npi"
} |
...
2.
...
Startup Eagle Server
Code Block | ||
---|---|---|
| ||
$EAGLE_HOME/bin/eagle-service.sh start |
3. Create New Monitoring Application
- Site: sandbox
- Application:
- Group: Cassandra
- Name: cassandraQueryLog
- Source Stream:
- Name: cassandraQueryLogStream
- Executor: cassandraQueryLogExecutor
- Schema:
cf | string | query column family |
ks | string | query keyspace |
host | string | the host that current metric comes form |
type | string | query type |
user | string | query user |
category | string | query category |
timestamp | long | query timestamp |
masked_columns | string | query masked_columns |
operation | string | query operation |
other_columns | string | query other_columns |
source | string | source host |
Here is the script for defining the metadata
Code Block | ||||
---|---|---|---|---|
| ||||
# EAGLE_SERVICE_HOST, default is `hostname -f` export EAGLE_SERVICE_HOST=localhost # EAGLE_SERVICE_PORT, default is 9099 export EAGLE_SERVICE_PORT=9099 # EAGLE_SERVICE_USER export EAGLE_SERVICE_USER=admin # EAGLE_SERVICE_PASSWORD export EAGLE_SERVICE_PASSWD=secret curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \ -d ' [ { "tags":{ "site":"sandbox", "application":"cassandraQueryLog" }, "enabled": true, "config": "{}" } ] ' curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \ -d ' [ { "tags":{ "application":"cassandraQueryLog" }, "description":"cassandra Query Log Monitoring", "alias":"QueryLogMonitor", "groupName":"Cassandra", "config":"{}", "features":["common","metadata"] } ] ' ## AlertStreamService echo "" echo "Importing AlertStreamService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ -d ' [ { "tags":{ "application":"cassandraQueryLog", "streamName":"cassandraQueryLogStream" }, "description":"cassandra query log data source stream" } ] ' ## AlertExecutorService: what alert streams are consumed by alert executor echo "" echo "Importing AlertExecutorService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \ -d ' [ { "tags":{ "application":"cassandraQueryLog", "alertExecutorId":"cassandraQueryLogExecutor", "streamName":"cassandraQueryLogStream" }, "description":"executor for cassandra query log stream" } ] ' ## AlertStreamSchemaService: schema for event from alert stream echo "" echo "Importing AlertStreamSchemaService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \ -d ' [ { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "host" }, "attrDescription": "the host that current metric comes form", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "application": "cassandraQueryLog", # EAGLE_SERVICE_HOST, default is `hostname -f` export EAGLE_SERVICE_HOST=localhost # EAGLE_SERVICE_PORT, default is 9099 export EAGLE_SERVICE_PORT=9099 # EAGLE_SERVICE_USER export EAGLE_SERVICE_USER=admin # EAGLE_SERVICE_PASSWORD export EAGLE_SERVICE_PASSWD=secret curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \ -d ' [ { "tagsstreamName":{ "cassandraQueryLogStream", "siteattrName": "sandboxsource", }, "applicationattrDescription": "cassandraQueryLogsource host", }"attrType": "string", "enabledcategory": true"", "configattrValueResolver": "{}" }, ] '{ curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \ -d ' [ { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "tagsattrName":{ "user" }, "applicationattrDescription": "cassandraQueryLogquery user", }"attrType": "string", "desccategory": "cassandra Query Log Monitoring", "aliasattrValueResolver": "QueryLogMonitor" }, { "grouptags":"Cassandra", { "configapplication": "{}cassandraQueryLog", "featuresstreamName":[ "commoncassandraQueryLogStream","metadata"] } ] ' ## AlertStreamService echo "attrName": "timestamp" echo "Importing AlertStreamService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ -d ' [ }, "attrDescription": "query timestamp", "attrType": "long", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", }, "desc":"cassandra query log data source stream" attrName": "category" } ] ' ## AlertExecutorService: what alert streams are consumed by alert executor echo "" echo "Importing AlertExecutorService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \ -d ' [}, "attrDescription": "query category", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSourceapplication": "cassandraQueryLog", "alertExecutorIdstreamName": "cassandraQueryLogExecutorcassandraQueryLogStream", "streamNameattrName": "cassandraQueryLogStreamtype" }, "descattrDescription":"executor for cassandra query log stream" "query type", "attrType": "string", } ] ' ## AlertStreamSchemaService: schema for event from alert stream echo "" echo "Importing AlertStreamSchemaService for HDFS... " curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \ -d ' [ { "category": "", "attrValueResolver": "" }, { "tags": { "application": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "tagsattrName": { "ks" }, "attrDescription": "query keyspace", "dataSourceattrType": "cassandraQueryLogstring", "streamNamecategory": "cassandraQueryLogStream", "attrNameattrValueResolver": "host" }, },{ "attrDescriptiontags": "the host that current metric comes form", { "attrTypeapplication": "stringcassandraQueryLog", "categorystreamName": "cassandraQueryLogStream", "attrValueResolverattrName": "cf" }, {}, "tagsattrDescription": { "query column family", "dataSourceattrType": "cassandraQueryLogstring", "streamNamecategory": "cassandraQueryLogStream", "attrNameattrValueResolver": "source" }, },{ "attrDescriptiontags": { "source host", "attrTypeapplication": "stringcassandraQueryLog", "categorystreamName": "cassandraQueryLogStream", "attrValueResolverattrName": "operation" }, {}, "tagsattrDescription": { "query operation", "dataSourceattrType": "cassandraQueryLogstring", "streamNamecategory": "cassandraQueryLogStream", "attrNameattrValueResolver": "user" }, },{ "attrDescriptiontags": { "query user", "attrTypeapplication": "stringcassandraQueryLog", "categorystreamName": "cassandraQueryLogStream", "attrValueResolverattrName": "masked_columns" }, {}, "tagsattrDescription": { "query masked_columns", "dataSourceattrType": "cassandraQueryLogstring", "streamNamecategory": "cassandraQueryLogStream", "attrNameattrValueResolver": "timestamp" }, },{ "attrDescriptiontags": { "query timestamp", "attrTypeapplication": "longcassandraQueryLog", "categorystreamName": "cassandraQueryLogStream", "attrValueResolverattrName": "other_columns" }, {}, "tagsattrDescription": { "query other_columns", "dataSourceattrType": "cassandraQueryLogstring", "streamNamecategory": "cassandraQueryLogStream", "attrNameattrValueResolver": "category" }, ] ' ## Finished echo "attrDescription": echo "query category", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags": { "dataSource": "cassandraQueryLog", "streamName": "cassandraQueryLogStream", "attrName": "type" }, "attrDescription": "query type", "attrType": "string", "category": "", "attrValueResolver": "" }, { "tags"Finished initialization for eagle topology" |
After successfully loading above metadata, the new application of Cassandra Query Monitoring Application could be found in Eagle Web after refreshing the page as following:
4. Start Monitoring Topology
Create a new topology configuration file named "cassandra-querylog-sandbox.conf" to connect source kafka topic: cassandra_querylog_sandbox as following:
Code Block language js title cassandra-querylog-sandbox.conf # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. { "envContextConfig" : { "env" : "storm", "
...
mode" : "
...
cluster",
...
"
...
topologyName" : "
...
cassandrawQueryLogMonitorTopology",
...
"
...
stormConfigFile" : "
...
cassandraw-querlog-storm.yaml", "parallelismConfig" :
...
{
...
"
...
cassandraQueryLogStream" :
...
1,
...
"
...
cassandraQueryLogExecutor*" :
...
1 } }, "dataSourceConfig": { "
...
topic" : "cassandra_querylog_sandbox",
...
"
...
zkConnection" : "sandbox.hortonworks.com:2181",
...
"zkConnectionTimeoutMS" : 15000, "consumerGroupId"
...
: "
...
eagle.consumer", "fetchSize" : 1048586, "
...
deserializerClass" : "
...
org.apache.eagle.datastream.storm.JsonMessageDeserializer",
...
"
...
transactionZKServers" : "
...
sandbox.hortonworks.com", "transactionZKPort" : 2181, "
...
transactionZKRoot" : "
...
/consumers", "transactionStateUpdateMS" :
...
2000 }, "alertExecutorConfigs" : { "
...
cassandraQueryLogExecutor" :
...
{ "
...
parallelism" :
...
1, "
...
partitioner" : "
...
org.apache.eagle.policy.DefaultPolicyPartitioner" "
...
needValidation" : "true" } }, "eagleProps" : { "site" :
...
"sandbox", "
...
application":
...
"cassandraQueryLog", "dataJoinPollIntervalSec" :
...
30, "
...
mailHost" : "
...
mailHost.com",
...
"mailSmtpPort":"25", "
...
mailDebug" : "
...
true", "balancePartitionEnabled" : true, #"
...
partitionRefreshIntervalInMin" :
...
60, #"kafkaStatisticRangeInMin" :
...
60,
...
"
...
eagleService":
...
{ "
...
host": "
...
localhost",
...
"
...
port":
...
9099,
...
"
...
username": "admin"
...
,
...
...
"
...
password":
...
"secret"
...
} "
...
readHdfsUserCommandPatternFrom" : "
...
file" }, "dynamicConfigSource" :
...
{ "
...
enabled" :
...
true, "initDelayMillis" :
...
0, "
...
delayMillis" :
...
30000
...
} }
Start monitoring topology
Code Block language bash ./bin/kafka-stream-monitor.sh cassandraQueryLogStream cassandraQueryLogExecutor cassandra-querylog-sandbox.conf
5. Validate with Sample Policy
a. Define sample policy with eagle
Code Block | ||
---|---|---|
| ||
from cassandraQueryLogStream[(ks == 'dg_keyspace') and (cf == 'customer_details')] select * insert into outputStream; |
b. Trigger alert with sample event
Code Block | ||
---|---|---|
| ||
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic cassandra_querylog_sandbox --broker-list sandbox.hortonworks.com:6667
{"host": "/192.168.6.227","source": "/192.168.6.227","user": "jaspa", "timestamp": 1455574202864, "category": "QUERY", "type": "CQL_SELECT", "ks": "dg_keyspace","cf": "customer_details","operation": "CQL_SELECT","masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url","other_columns": "id|npi"} |
c. Review generated alert
3. Start Monitoring Topology