Info |
---|
This is a tutorial about how to create a new eagle application step by step, though it is using cassandra query monitoring as example, but it could be extended to any log-based monitoring cases. |
Table of Contents
1. Flow Stream into Kafka
Raw Query Log Sample:
Code Block | ||
---|---|---|
| ||
host:/192.168.6.233|source:/192.168.6.227|user:cassandra|timestamp:1454965365762|category:QUERY|type:CQL_SELECT|ks:dg_keyspace|cf:customer_details|operation:Select id,name,email,contact from customer_details|masked_columns:[email,contact]|other_columns:[id,name] |
Create Kafka topic: cassandra_querylog_sandbox
Code Block | ||
---|---|---|
| ||
$KAFKA_HOME/bin/kafka-topics.sh --create --topic cassandra_querylog_sandbox --zookeeper localhost:2181 --partitions 1 --replication-factor 1 |
Parsed Query Log and Flow into Kafka
Code Block | ||
---|---|---|
| ||
{
"host": "/192.168.6.227",
"source": "/192.168.6.227",
"user": "jaspa",
"timestamp": 1455574202864,
"category": "QUERY",
"type": "CQL_SELECT",
"ks": "dg_keyspace",
"cf": "customer_details",
"operation": "CQL_SELECT",
"masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url",
"other_columns": "id|npi"
} |
2. Startup Eagle Server
Code Block | ||
---|---|---|
| ||
$EAGLE_HOME/bin/eagle-service.sh start |
3. Create New Monitoring Application
- Site: sandbox
- Application:
- Group: Cassandra
- Name: cassandraQueryLog
- Source Stream:
- Name: cassandraQueryLogStream
- Executor: cassandraQueryLogExecutor
- Schema:
cf | string | query column family |
ks | string | query keyspace |
host | string | the host that current metric comes form |
type | string | query type |
user | string | query user |
category | string | query category |
timestamp | long | query timestamp |
masked_columns | string | query masked_columns |
operation | string | query operation |
other_columns | string | query other_columns |
source | string | source host |
Here is the script for defining the metadata
Code Block | ||||
---|---|---|---|---|
| ||||
# EAGLE_SERVICE_HOST, default is `hostname -f`
export EAGLE_SERVICE_HOST=localhost
# EAGLE_SERVICE_PORT, default is 9099
export EAGLE_SERVICE_PORT=9099
# EAGLE_SERVICE_USER
export EAGLE_SERVICE_USER=admin
# EAGLE_SERVICE_PASSWORD
export EAGLE_SERVICE_PASSWD=secret
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \
-d '
[
{
"tags":{
"site":"sandbox",
"application":"cassandraQueryLog"
},
"enabled": true,
"config": "{}"
}
]
'
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \
-d '
[
{
"tags":{
"application":"cassandraQueryLog"
},
"description":"cassandra Query Log Monitoring",
"alias":"QueryLogMonitor",
"groupName":"Cassandra",
"config":"{}",
"features":["common","metadata"]
}
]
'
## AlertStreamService
echo ""
echo "Importing AlertStreamService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
-d '
[
{
"tags":{
"application":"cassandraQueryLog",
"streamName":"cassandraQueryLogStream"
},
"description":"cassandra query log data source stream"
}
]
'
## AlertExecutorService: what alert streams are consumed by alert executor
echo ""
echo "Importing AlertExecutorService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
-d '
[
{
"tags":{
"application":"cassandraQueryLog",
"alertExecutorId":"cassandraQueryLogExecutor",
"streamName":"cassandraQueryLogStream"
},
"description":"executor for cassandra query log stream"
}
]
'
## AlertStreamSchemaService: schema for event from alert stream
echo ""
echo "Importing AlertStreamSchemaService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
-d '
[
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "host"
},
"attrDescription": "the host that current metric comes form",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "source"
},
"attrDescription": "source host",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "user"
},
"attrDescription": "query user",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "timestamp"
},
"attrDescription": "query timestamp",
"attrType": "long",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "category"
},
"attrDescription": "query category",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "type"
},
"attrDescription": "query type",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "ks"
},
"attrDescription": "query keyspace",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "cf"
},
"attrDescription": "query column family",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "operation"
},
"attrDescription": "query operation",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "masked_columns"
},
"attrDescription": "query masked_columns",
"attrType": "string",
"category": "",
"attrValueResolver": ""
},
{
"tags": {
"application": "cassandraQueryLog",
"streamName": "cassandraQueryLogStream",
"attrName": "other_columns"
},
"attrDescription": "query other_columns",
"attrType": "string",
"category": "",
"attrValueResolver": ""
}
]
'
## Finished
echo ""
echo "Finished initialization for eagle topology" |
After successfully loading above metadata, the new application of Cassandra Query Monitoring Application could be found in Eagle Web after refreshing the page as following:
4. Start Monitoring Topology
Create a new topology configuration file named "cassandra-querylog-sandbox.conf" to connect source kafka topic: cassandra_querylog_sandbox as following:
Code Block language js title cassandra-querylog-sandbox.conf # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. { "envContextConfig" : { "env" : "storm", "mode" : "cluster", "topologyName" : "cassandrawQueryLogMonitorTopology", "stormConfigFile" : "cassandraw-querlog-storm.yaml", "parallelismConfig" : { "cassandraQueryLogStream" : 1, "cassandraQueryLogExecutor*" : 1 } }, "dataSourceConfig": { "topic" : "cassandra_querylog_sandbox", "zkConnection" : "sandbox.hortonworks.com:2181", "zkConnectionTimeoutMS" : 15000, "consumerGroupId" : "eagle.consumer", "fetchSize" : 1048586, "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer", "transactionZKServers" : "sandbox.hortonworks.com", "transactionZKPort" : 2181, "transactionZKRoot" : "/consumers", "transactionStateUpdateMS" : 2000 }, "alertExecutorConfigs" : { "cassandraQueryLogExecutor" : { "parallelism" : 1, "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" "needValidation" : "true" } }, "eagleProps" : { "site" : "sandbox", "application": "cassandraQueryLog", "dataJoinPollIntervalSec" : 30, "mailHost" : "mailHost.com", "mailSmtpPort":"25", "mailDebug" : "true", "balancePartitionEnabled" : true, #"partitionRefreshIntervalInMin" : 60, #"kafkaStatisticRangeInMin" : 60, "eagleService": { "host": "localhost", "port": 9099, "username": "admin", "password": "secret" } "readHdfsUserCommandPatternFrom" : "file" }, "dynamicConfigSource" : { "enabled" : true, "initDelayMillis" : 0, "delayMillis" : 30000 } }
Start monitoring topology
Code Block language bash ./bin/kafka-stream-monitor.sh cassandraQueryLogStream cassandraQueryLogExecutor cassandra-querylog-sandbox.conf
5. Validate with Sample Policy
a. Define sample policy with eagle
Code Block | ||
---|---|---|
| ||
from cassandraQueryLogStream[(ks == 'dg_keyspace') and (cf == 'customer_details')] select * insert into outputStream; |
b. Trigger alert with sample event
Code Block | ||
---|---|---|
| ||
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic cassandra_querylog_sandbox --broker-list sandbox.hortonworks.com:6667
{"host": "/192.168.6.227","source": "/192.168.6.227","user": "jaspa", "timestamp": 1455574202864, "category": "QUERY", "type": "CQL_SELECT", "ks": "dg_keyspace","cf": "customer_details","operation": "CQL_SELECT","masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url","other_columns": "id|npi"} |
c. Review generated alert