Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

This is a tutorial about how to create a new eagle application step by step, though it is using cassandra query monitoring as example, but it could be extended to any log-based monitoring cases.

Table of Contents

1. Flow

...

Stream into

...

Kafka

 Raw Query Log Sample:

Code Block
languagetext
host:/192.168.6.233|source:/192.168.6.227|user:cassandra|timestamp:1454965365762|category:QUERY|type:CQL_SELECT|ks:dg_keyspace|cf:customer_details|operation:Select  id,name,email,contact from customer_details|masked_columns:[email,contact]|other_columns:[id,name]

Parsed Query Log and Flow into Kafka

Create Kafka topic: cassandra_querylog_sandbox

Code Block
languagebash
$KAFKA_HOME/bin/kafka-topics.sh --create --topic cassandra_querylog_sandbox --zookeeper localhost:2181 --partitions 1 --replication-factor 1

Parsed Query Log and Flow into Kafka

Code Block
languagejs
{
    "host": "/192.168.6.227",
   
Code Block
languagejs
{
    "host": "/192.168.6.227",
    "source": "/192.168.6.227",
    "user": "jaspa",
    "timestamp": 1455574202864,
    "category": "QUERY",
    "type": "CQL_SELECT",
    "ks": "dg_keyspace",
    "cf": "customer_details",
    "operation": "CQL_SELECT",
    "masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url",
    "other_columns": "id|npi"
}

...

2.

...

Startup Eagle Server

 

Code Block
languagebash
$EAGLE_HOME/bin/eagle-service.sh start

Image Added

3. Create New Monitoring Application

  • Site: sandbox
  • Application
    • Group: Cassandra
    • Name: cassandraQueryLog
  • Source Stream: 
    • Name: cassandraQueryLogStream
    • Executor: cassandraQueryLogExecutor
    • Schema:
cfstringquery column family
ksstringquery keyspace
hoststringthe host that current metric comes form
typestringquery type
userstringquery user
categorystringquery category
timestamplongquery timestamp
masked_columnsstringquery masked_columns
operationstringquery operation
other_columnsstringquery other_columns
sourcestringsource host

 

Here is the script for defining the metadata

Code Block
languagebash
titleCassandraQueryLogMonitorMetadata.sh
# EAGLE_SERVICE_HOST, default is `hostname -f`
export EAGLE_SERVICE_HOST=localhost
# EAGLE_SERVICE_PORT, default is 9099
export EAGLE_SERVICE_PORT=9099
# EAGLE_SERVICE_USER
export EAGLE_SERVICE_USER=admin
# EAGLE_SERVICE_PASSWORD
export EAGLE_SERVICE_PASSWD=secret
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \
  -d '
  [
     {
        "tags":{
           "site":"sandbox",
           "application":"cassandraQueryLog"
        },
        "enabled": true,
        "config": "{}"
     }
  ]
  '
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \
  -d '
  [
     {
        "tags":{
           "application":"cassandraQueryLog"
        },
        "description":"cassandra Query Log Monitoring",
        "alias":"QueryLogMonitor",
        "groupName":"Cassandra",
        "config":"{}",
        "features":["common","metadata"]
     }
  ]
  '
## AlertStreamService
echo ""
echo "Importing AlertStreamService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
 -d '
 [
    {
       "tags":{
          "application":"cassandraQueryLog",
          "streamName":"cassandraQueryLogStream"
       },
       "description":"cassandra query log data source stream"
    }
 ]
 '
## AlertExecutorService: what alert streams are consumed by alert executor
echo ""
echo "Importing AlertExecutorService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
 -d '
 [
    {
       "tags":{
          "application":"cassandraQueryLog",
          "alertExecutorId":"cassandraQueryLogExecutor",
          "streamName":"cassandraQueryLogStream"
       },
       "description":"executor for cassandra query log stream"
    }
 ]
 '
## AlertStreamSchemaService: schema for event from alert stream
echo ""
echo "Importing AlertStreamSchemaService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
 -d '
 [
    {
       "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "host"
       },
       "attrDescription": "the host that current metric comes form",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "application": "cassandraQueryLog",
  # EAGLE_SERVICE_HOST, default is `hostname -f`
export EAGLE_SERVICE_HOST=localhost
# EAGLE_SERVICE_PORT, default is 9099
export EAGLE_SERVICE_PORT=9099
# EAGLE_SERVICE_USER
export EAGLE_SERVICE_USER=admin
# EAGLE_SERVICE_PASSWORD
export EAGLE_SERVICE_PASSWD=secret
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \
  -d '
  [
     {
        "tagsstreamName":{ "cassandraQueryLogStream",
           "siteattrName": "sandboxsource",
       },
       "applicationattrDescription": "cassandraQueryLogsource host",
        }"attrType": "string",
        "enabledcategory": true"",
        "configattrValueResolver": "{}"
     },
  ]
  '{
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \
  -d '
  [
     {
"tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "tagsattrName":{
 "user"
       },
       "applicationattrDescription": "cassandraQueryLogquery user",
        }"attrType": "string",
        "desccategory": "cassandra Query Log Monitoring",
        "aliasattrValueResolver": "QueryLogMonitor"
    },
    {
       "grouptags":"Cassandra",
 {
          "configapplication": "{}cassandraQueryLog",
          "featuresstreamName":[ "commoncassandraQueryLogStream","metadata"]
     }
  ]
  '
## AlertStreamService
echo "attrName": "timestamp"
echo   "Importing AlertStreamService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
 -d '
 [ },
       "attrDescription": "query timestamp",
       "attrType": "long",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
       },
       "desc":"cassandra query log data source stream"
attrName": "category"
     }
 ]
 '
## AlertExecutorService: what alert streams are consumed by alert executor
echo ""
echo "Importing AlertExecutorService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
 -d '
 [},
       "attrDescription": "query category",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "alertExecutorIdstreamName": "cassandraQueryLogExecutorcassandraQueryLogStream",
          "streamNameattrName": "cassandraQueryLogStreamtype"
       },
       "descattrDescription":"executor for cassandra query log stream" "query type",
       "attrType": "string",
    }
 ]
 '
## AlertStreamSchemaService: schema for event from alert stream
echo ""
echo "Importing AlertStreamSchemaService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
 -d '
 [
    {
 "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "tagsattrName": {
    "ks"
       },
       "attrDescription": "query keyspace",
       "dataSourceattrType": "cassandraQueryLogstring",
          "streamNamecategory": "cassandraQueryLogStream",
          "attrNameattrValueResolver": "host"
    },
    },{
       "attrDescriptiontags": "the host that current metric comes form",
{
          "attrTypeapplication": "stringcassandraQueryLog",
          "categorystreamName": "cassandraQueryLogStream",
          "attrValueResolverattrName": "cf"
    },
    {},
       "tagsattrDescription": {
"query column  family",
       "dataSourceattrType": "cassandraQueryLogstring",
          "streamNamecategory": "cassandraQueryLogStream",
          "attrNameattrValueResolver": "source"
    },
    },{
       "attrDescriptiontags": {
  "source host",
       "attrTypeapplication": "stringcassandraQueryLog",
          "categorystreamName": "cassandraQueryLogStream",
          "attrValueResolverattrName": "operation"
    },
    {},
       "tagsattrDescription": {
    "query operation",
       "dataSourceattrType": "cassandraQueryLogstring",
          "streamNamecategory": "cassandraQueryLogStream",
          "attrNameattrValueResolver": "user"
    },
    },{
       "attrDescriptiontags": {
  "query user",
       "attrTypeapplication": "stringcassandraQueryLog",
          "categorystreamName": "cassandraQueryLogStream",

          "attrValueResolverattrName": "masked_columns"
    },
    {},
       "tagsattrDescription": {
    "query masked_columns",
       "dataSourceattrType": "cassandraQueryLogstring",
          "streamNamecategory": "cassandraQueryLogStream",
          "attrNameattrValueResolver": "timestamp"
    },
    },{
       "attrDescriptiontags": {
  "query timestamp",
       "attrTypeapplication": "longcassandraQueryLog",
          "categorystreamName": "cassandraQueryLogStream",
          "attrValueResolverattrName": "other_columns"
    },
    {},
       "tagsattrDescription": {
    "query other_columns",
       "dataSourceattrType": "cassandraQueryLogstring",
          "streamNamecategory": "cassandraQueryLogStream",
          "attrNameattrValueResolver": "category"
       },
      ]
 '
## Finished
echo "attrDescription":
echo "query category",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSource": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "type"
       },
       "attrDescription": "query type",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags"Finished initialization for eagle topology"

 

After successfully loading above metadata, the new application of Cassandra Query Monitoring Application could be found in Eagle Web after refreshing the page as following:

Image Added

4. Start Monitoring Topology

  1. Create a new topology configuration file named "cassandra-querylog-sandbox.conf" to connect source kafka topic: cassandra_querylog_sandbox as following:

    Code Block
    languagejs
    titlecassandra-querylog-sandbox.conf
     # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    {
      "envContextConfig" : {
        "env" : "storm",
        "

...

  1. mode" : "

...

  1. cluster",
    

...

  1.     "

...

  1. topologyName" : "

...

  1. cassandrawQueryLogMonitorTopology",
    

...

  1.     "

...

  1. stormConfigFile" : "

...

  1. cassandraw-querlog-storm.yaml",
        "parallelismConfig" : 

...

  1. {
    

...

  1.       "

...

  1. cassandraQueryLogStream" : 

...

  1. 1,
    

...

  1.       "

...

  1. cassandraQueryLogExecutor*" :

...

  1.  1
        }
      },
      "dataSourceConfig": {
        "

...

  1. topic" : "cassandra_querylog_sandbox",
    

...

  1.     "

...

  1. zkConnection" : "sandbox.hortonworks.com:2181",
        

...

  1. "zkConnectionTimeoutMS" : 15000,
        "consumerGroupId" 

...

  1. : "

...

  1. eagle.consumer",
        "fetchSize" : 1048586,
        "

...

  1. deserializerClass" : "

...

  1. org.apache.eagle.datastream.storm.JsonMessageDeserializer",
    

...

  1.     "

...

  1. transactionZKServers" : "

...

  1. sandbox.hortonworks.com",
        "transactionZKPort" : 2181,
        "

...

  1. transactionZKRoot" : "

...

  1. /consumers",
        "transactionStateUpdateMS" : 

...

  1. 2000
      },
      "alertExecutorConfigs" : {
         "

...

  1. cassandraQueryLogExecutor" : 

...

  1. {
           "

...

  1. parallelism" : 

...

  1. 1,
           "

...

  1. partitioner" : "

...

  1. org.apache.eagle.policy.DefaultPolicyPartitioner"
           "

...

  1. needValidation" : "true"
         }
      },
      "eagleProps" : {
        "site" : 

...

  1. "sandbox",
        "

...

  1. application": 

...

  1. "cassandraQueryLog",
       "dataJoinPollIntervalSec" : 

...

  1. 30,
        "

...

  1. mailHost" : "

...

  1. mailHost.com",
        

...

  1. "mailSmtpPort":"25",
        "

...

  1. mailDebug" : "

...

  1. true",
        "balancePartitionEnabled" : true,
        #"

...

  1. partitionRefreshIntervalInMin" : 

...

  1. 60,
        #"kafkaStatisticRangeInMin" : 

...

  1. 60,
    

...

  1.     "

...

  1. eagleService": 

...

  1. {
          "

...

  1. host": "

...

  1. localhost",
    

...

  1.       "

...

  1. port": 

...

  1. 9099,
    

...

  1.       "

...

  1. username": "admin"

...

  1. ,

...

  1. 
    

...

  1.       "

...

  1. password": 

...

  1. "secret"
        

...

  1. }
        "

...

  1. readHdfsUserCommandPatternFrom" : "

...

  1. file"
      },
      "dynamicConfigSource" : 

...

  1. {
       "

...

  1. enabled" : 

...

  1. true,
       "initDelayMillis" : 

...

  1. 0,
       "

...

  1. delayMillis" : 

...

  1. 30000
      

...

  1. }
    }
    
    
  2. Start monitoring topology

    Code Block
    languagebash
    ./bin/kafka-stream-monitor.sh cassandraQueryLogStream cassandraQueryLogExecutor cassandra-querylog-sandbox.conf

5. Validate with Sample Policy

a. Define sample policy with eagle

Image AddedImage Added

Code Block
languagetext
from cassandraQueryLogStream[(ks == 'dg_keyspace') and (cf == 'customer_details')] select * insert into outputStream;

b. Trigger alert with sample event

 

Code Block
languagebash
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic cassandra_querylog_sandbox --broker-list sandbox.hortonworks.com:6667
 
{"host": "/192.168.6.227","source": "/192.168.6.227","user": "jaspa",    "timestamp": 1455574202864, "category": "QUERY",    "type": "CQL_SELECT",    "ks": "dg_keyspace","cf": "customer_details","operation": "CQL_SELECT","masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url","other_columns": "id|npi"}

c. Review generated alert

Image AddedImage Added

 

3. Start Monitoring Topology