Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

This is a tutorial about how to create a new eagle application step by step, though it is using cassandra query monitoring as example, but it could be extended to any log-based monitoring cases.

Table of Contents

1. Flow Stream into Kafka

 Raw Query Log Sample

...

:

Code Block
languagetext
host:/192.168.6.233|source:/192.168.6.227|user:cassandra|timestamp:1454965365762|category:QUERY|type:CQL_SELECT|ks:dg_keyspace|cf:customer_details|operation:Select  id,name,email,contact from customer_details|masked_columns:[email,contact]|other_columns:[id,name]

...

Create Kafka topic: cassandra_querylog_sandbox

Code Block
languagejsbash
$KAFKA_HOME/bin/kafka-topics.sh --create --topic cassandra_querylog_sandbox --zookeeper localhost:2181 --partitions 1 --replication-factor 1

Parsed Query Log and Flow into Kafka

Code Block
languagejs
{
    "host": "/192.168.6.227"{
    "host": "/192.168.6.227",
    "source": "/192.168.6.227",
    "user": "jaspa",
    "timestamp": 1455574202864,
    "category": "QUERY",
    "type": "CQL_SELECT",
    "ks": "dg_keyspace",
    "cf": "customer_details",
    "operation": "CQL_SELECT",
    "masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url",
    "other_columns": "id|npi"
}

...

2.

...

Startup Eagle Server

 

Code Block
languagebash
$EAGLE_HOME/bin/eagle-service.sh start

Image Added

3. Create New Monitoring Application

  • Site: sandbox
  • Application
    • Group: Cassandra
    • Name: cassandraQueryLog
  • Source Stream: 
    • Name: cassandraQueryLogStream
    • Executor: cassandraQueryLogExecutor
    • Schema:
cfstringquery column family
ksstringquery keyspace
hoststringthe host that current metric comes form
typestringquery type
userstringquery user
categorystringquery category
timestamplongquery timestamp
masked_columnsstringquery masked_columns
operationstringquery operation
other_columnsstringquery other_columns
sourcestringsource host

 

Here is the script for defining the metadata

Code Block
languagebash
titleCassandraQueryLogMonitorMetadata.sh
# EAGLE_SERVICE_HOST, default is `hostname -f`
export EAGLE_SERVICE_HOST=localhost
# EAGLE_SERVICE_PORT, default is 9099
export EAGLE_SERVICE_PORT=9099
# EAGLE_SERVICE_USER
export EAGLE_SERVICE_USER=admin
# EAGLE_SERVICE_PASSWORD
export EAGLE_SERVICE_PASSWD=secret
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \
  -d '
  [
     {
        "tags":{
           "site":"sandbox",
           "application":"cassandraQueryLog"
        },
        "enabled": true,
        "config": "{}"
     }
  ]
  '
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \
  -d '
  [
     {
        "tags":{
           "application":"cassandraQueryLog"
        },
        "description":"cassandra Query Log Monitoring",
        "alias":"QueryLogMonitor",
        "groupName":"Cassandra",
        "config":"{}",
        "features":["common","metadata"]
     }
  ]
  '
## AlertStreamService
echo ""
echo "Importing AlertStreamService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
 -d '
 [
    {
       "tags":{
          "application":"cassandraQueryLog",
          "streamName":"cassandraQueryLogStream"
       },
       "description":"cassandra query log data source stream"
    }
 ]
 '
## AlertExecutorService: what alert streams are consumed by alert executor
echo ""
echo "Importing AlertExecutorService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
 -d '
 [
    {
       "tags":{
          "application":"cassandraQueryLog",
          "alertExecutorId":"cassandraQueryLogExecutor",
          "streamName":"cassandraQueryLogStream"
       },
       "description":"executor for cassandra query log stream"
    }
 ]
 '
## AlertStreamSchemaService: schema for event from alert stream
echo ""
echo "Importing AlertStreamSchemaService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
 -d '
 [
    {
       "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "host"
       },
       "attrDescription": "the host that current metric comes form",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "source"
       },
       "attrDescription": "source host",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "user"
       },
       "attrDescription": "query user",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "timestamp"
       },
       "attrDescription": "query timestamp",
       "attrType": "long",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "category"
       },
       "attrDescription": "query category",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
  curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \
  -d '
  [
     {
        "tagsapplication":{ "cassandraQueryLog",
           "site":"sandbox",
"streamName": "cassandraQueryLogStream",
          "attrName": "type"
       },
       "applicationattrDescription": "cassandraQueryLogMonitorquery type",
        }"attrType": "string",
        "enabledcategory": true"",
        "configattrValueResolver": "{}"
     },
   ] {
   '

curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \
  -d '
  [
     {
     "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "ks"
       },
       "tagsattrDescription":{
 "query keyspace",
         "applicationattrType": "cassandraQueryLogMonitorstring",
        }"category": "",
        "descattrValueResolver": ""cassandra
  Query Log Monitoring"},
    {
       "aliastags":"QueryLogMonitor", {
          "groupapplication": "CassandracassandraQueryLog",
          "configstreamName": "{}cassandraQueryLogStream",
          "featuresattrName":["common","metadata"] "cf"
     }
  ]
  '

## AlertStreamService: alert streams generated from data source
echo ""
echo "Importing AlertStreamService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
 -d '
 [},
       "attrDescription": "query column family",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "prefixtags":"alertStream",
 {
          "tagsapplication":{ "cassandraQueryLog",
          "dataSourcestreamName": "hadoopJmxMetricDataSourcecassandraQueryLogStream",
          "streamNameattrName": "hadoopJmxMetricEventStreamoperation"
       },
       "attrDescription": "query operation",
       "descattrType": "hadoopstring",
    }
 ]
 '

## AlertExecutorService: what alert streams are consumed by alert executor
echo "category": "",
       "attrValueResolver": ""
echo "Importing AlertExecutorService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
 -d '
 [
    {},
    {
       "tags": {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "prefixattrName": "alertExecutor"masked_columns"
       },
       "tagsattrDescription":{
   "query masked_columns",
       "dataSourceattrType": "hadoopJmxMetricDataSourcestring",
          "alertExecutorIdcategory": "hadoopJmxMetricAlertExecutor",
          "streamNameattrValueResolver": "hadoopJmxMetricEventStream"
    },
    },{
       "desctags":"aggregate executor for hadoop jmx metric event stream"
    }
 ]
 '

## AlertStreamSchemaService: schema for event from alert stream
echo ""
echo "Importing AlertStreamSchemaService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
 -d '
 [
    { {
          "application": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "other_columns"
       },
       "attrDescription": "query other_columns",
       "attrType": "string",
       "prefixcategory": "alertStreamSchema",
       "tagsattrValueResolver": { ""
    }
 ]
 '
## Finished
echo ""
echo "Finished initialization for    "dataSource": "hadoopJmxMetricDataSource",
          "streamName": "hadoopJmxMetricEventStream",
          "attrName": "host"
       },
       "attrDescription": "the host that current metric comes form",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "prefix": "alertStreamSchema",
       "tags": {
          "dataSource": "hadoopJmxMetricDataSource",
          "streamName": "hadoopJmxMetricEventStream",
          "attrName": "timestamp"
       },
       "attrDescription": "the metric timestamp",
       "attrType": "long",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "prefix": "alertStreamSchema",
       "tagseagle topology"

 

After successfully loading above metadata, the new application of Cassandra Query Monitoring Application could be found in Eagle Web after refreshing the page as following:

Image Added

4. Start Monitoring Topology

  1. Create a new topology configuration file named "cassandra-querylog-sandbox.conf" to connect source kafka topic: cassandra_querylog_sandbox as following:

    Code Block
    languagejs
    titlecassandra-querylog-sandbox.conf
     # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    {
      "envContextConfig" : {
        "env" : "storm",
        "mode" : "cluster",
        "topologyName" : "cassandrawQueryLogMonitorTopology",
        "stormConfigFile" : "cassandraw-querlog-storm.yaml",
        "parallelismConfig" : {
          "cassandraQueryLogStream" : 1,
          "cassandraQueryLogExecutor*" : 1
        }
      },
      "dataSourceConfig": {
        "topic" : "cassandra_querylog_sandbox",
        "

...

  1. zkConnection" : "

...

  1. sandbox.hortonworks.com:2181",
        "zkConnectionTimeoutMS" : 

...

  1. 15000,
        "

...

  1. consumerGroupId" : "

...

  1. eagle.consumer",
        "fetchSize" : 1048586,
        "

...

  1. deserializerClass" : "

...

  1. org.apache.eagle.datastream.storm.JsonMessageDeserializer",
        "

...

  1. transactionZKServers" : "

...

  1. sandbox.hortonworks.com",
    

...

  1.     "

...

  1. transactionZKPort" : 

...

  1. 2181,
    

...

  1.     "

...

  1. transactionZKRoot" : "/consumers",
        

...

  1. "

...

  1. transactionStateUpdateMS" : 

...

  1. 2000
    

...

  1.   },
      "alertExecutorConfigs" : {
         

...

  1. "

...

  1. cassandraQueryLogExecutor" : 

...

  1. {
           "

...

  1. parallelism" : 

...

  1. 1,
           "partitioner" 

...

  1. : "org.apache.eagle.policy.DefaultPolicyPartitioner"
           "

...

  1. needValidation" : "

...

  1. true"

...

  1. 
         }
      

...

  1. },
      "

...

  1. eagleProps" : 

...

  1. {
        "site" : 

...

  1. "sandbox",
        

...

  1. "

...

  1. application": "

...

  1. cassandraQueryLog",
     

...

  1.  

...

  1.  

...

  1. "dataJoinPollIntervalSec" 

...

  1. : 30,
    

...

  1.     "

...

  1. mailHost" : "

...

  1. mailHost.com",
        "mailSmtpPort":"25",
        "

...

  1. mailDebug" : "true",
        "balancePartitionEnabled" : true,
        #"

...

  1. partitionRefreshIntervalInMin" : 

...

  1. 60,
        #"kafkaStatisticRangeInMin" : 

...

  1. 60,
        "eagleService": {
    

...

  1.       "

...

  1. host": "

...

  1. localhost",
    

...

  1.       "

...

  1. port": 

...

  1. 9099,
          "

...

  1. username": "

...

  1. admin",
    

...

  1.       

...

  1. "

...

  1. password": "

...

  1. secret"

...

  1. 
        

...

  1. }
        "

...

  1. readHdfsUserCommandPatternFrom" : "

...

  1. file"
      },
      "dynamicConfigSource" : {
     

...

  1.   "enabled" : true,
       "

...

  1. initDelayMillis" : 

...

 

...

  1. 0,
       "delayMillis" : 30000
      }
    }
    
    
  2. Start monitoring topology

    Code Block
    languagebash
    ./bin/kafka-stream-monitor.sh cassandraQueryLogStream cassandraQueryLogExecutor cassandra-querylog-sandbox.conf

5. Validate with Sample Policy

a. Define sample policy with eagle

Image AddedImage Added

Code Block
languagetext
from cassandraQueryLogStream[(ks == 'dg_keyspace') and (cf == 'customer_details')] select * insert into outputStream;

b. Trigger alert with sample event

 

Code Block
languagebash
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic cassandra_querylog_sandbox --broker-list sandbox.hortonworks.com:6667
 
{"host": "/192.168.6.227","source": "/192.168.6.227","user": "jaspa",    "timestamp": 1455574202864, "category": "QUERY",    "type": "CQL_SELECT",    "ks": "dg_keyspace","cf": "customer_details","operation": "CQL_SELECT","masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url","other_columns": "id|npi"}

c. Review generated alert

Image AddedImage Added