Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

This is a tutorial about how to create a new eagle application step by step, though it is using cassandra query monitoring as example, but it could be extended to any log-based monitoring cases.

Table of Contents

1. Flow

...

Stream into

...

Kafka

 Raw Query Log Sample:

Code Block
languagetext
host:/192.168.6.233|source:/192.168.6.227|user:cassandra|timestamp:1454965365762|category:QUERY|type:CQL_SELECT|ks:dg_keyspace|cf:customer_details|operation:Select  id,name,email,contact from customer_details|masked_columns:[email,contact]|other_columns:[id,name]

Parsed Query Log and Flow into KafkaCreate Kafka topic: cassandra_querylog_sandbox

Code Block
languagebash
$KAFKA_HOME/bin/kafka-topics.sh --create --topic cassandra_querylog_sandbox --zookeeper localhost:2181 --partitions 1 --replication-factor 1

Parsed Query Log and Flow into Kafka

Code Block
languagejs
{
    "host": "/192.168.6.227",
    "source": "/192.168.6.227",
    "user": "jaspa",
    "timestamp": 1455574202864,
    "category": "QUERY",
    "type": "CQL_SELECT",
    "ks": "dg_keyspace",
    "cf": "customer_details",
    "operation": "CQL_SELECT",
    "masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url",
    "other_columns": "id|npi"
}

2. Startup Eagle

...

Server

Image Removed

 

Code Block
languagebash
$EAGLE_HOME/bin/eagle-service.sh start

Image Added

3. Create New Monitoring Application

  • Site: sandbox
  • Application
    • Group: Cassandra
    • Name: cassandraQueryLog
  • Source Stream: 
    • Name: cassandraQueryLogStream
    • Executor: cassandraQueryLogExecutor
    • Schema:

...

Code Block
languagebash
titleCassandraQueryLogMonitorMetadata.shcollapsetrue
# EAGLE_SERVICE_HOST, default is `hostname -f`
export EAGLE_SERVICE_HOST=localhost
# EAGLE_SERVICE_PORT, default is 9099
export EAGLE_SERVICE_PORT=9099
# EAGLE_SERVICE_USER
export EAGLE_SERVICE_USER=admin
# EAGLE_SERVICE_PASSWORD
export EAGLE_SERVICE_PASSWD=secret
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \
  -d '
  [
     {
        "tags":{
           "site":"sandbox",
           "application":"cassandraQueryLog"
        },
        "enabled": true,
        "config": "{}"
     }
  ]
  '
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \
  -d '
  [
     {
        "tags":{
           "application":"cassandraQueryLog"
        },
        "descdescription":"cassandra Query Log Monitoring",
        "alias":"QueryLogMonitor",
        "groupgroupName":"Cassandra",
        "config":"{}",
        "features":["common","metadata"]
     }
  ]
  '
## AlertStreamService
echo ""
echo "Importing AlertStreamService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
 -d '
 [
    {
       "tags":{
          "dataSourceapplication":"cassandraQueryLog",
          "streamName":"cassandraQueryLogStream"
       },
       "descdescription":"cassandra query log data source stream"
    }
 ]
 '
## AlertExecutorService: what alert streams are consumed by alert executor
echo ""
echo "Importing AlertExecutorService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
 "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
 -d '
 [
    {
       "tags":{
          "dataSourceapplication":"cassandraQueryLog",
          "alertExecutorId":"cassandraQueryLogExecutor",
          "streamName":"cassandraQueryLogStream"
       },
       "descdescription":"executor for cassandra query log stream"
    }
 ]
 '
## AlertStreamSchemaService: schema for event from alert stream
echo ""
echo "Importing AlertStreamSchemaService for HDFS... "
curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
 -d '
 [
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "host"
       },
       "attrDescription": "the host that current metric comes form",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "source"
       },
       "attrDescription": "source host",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "user"
       },
       "attrDescription": "query user",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "timestamp"
       },
       "attrDescription": "query timestamp",
       "attrType": "long",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "category"
       },
       "attrDescription": "query category",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "type"
       },
       "attrDescription": "query type",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "ks"
       },
       "attrDescription": "query keyspace",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "cf"
       },
       "attrDescription": "query column family",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "operation"
       },
       "attrDescription": "query operation",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "masked_columns"
       },
       "attrDescription": "query masked_columns",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    },
    {
       "tags": {
          "dataSourceapplication": "cassandraQueryLog",
          "streamName": "cassandraQueryLogStream",
          "attrName": "other_columns"
       },
       "attrDescription": "query other_columns",
       "attrType": "string",
       "category": "",
       "attrValueResolver": ""
    }
 ]
 '
## Finished
echo ""
echo "Finished initialization for eagle topology"

...

After successfully loading above metadata, the new application of Cassandra Query Monitoring Application can could be found in Eagle Web .

Image Removed

3. Start Monitoring Topology

after refreshing the page as following:

Image Added

4. Start Monitoring Topology

  1. Create a new topology configuration file named "cassandra-querylog-sandbox.conf" to connect source kafka topic: cassandra_querylog_sandbox as following:

    Code Block
    languagejs
    titlecassandra-querylog-sandbox.conf
     # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    {
      "envContextConfig" : {
        "env" : "storm",
        "mode" : "cluster",
        "topologyName" : "cassandrawQueryLogMonitorTopology",
        "stormConfigFile" : "cassandraw-querlog-storm.yaml",
        "parallelismConfig" : {
          "cassandraQueryLogStream" : 1,
          "cassandraQueryLogExecutor*" : 1
        }
      },
      "dataSourceConfig": {
        "topic" : "cassandra_querylog_sandbox",
        "zkConnection" : "sandbox.hortonworks.com:2181",
        "zkConnectionTimeoutMS" : 15000,
        "consumerGroupId" : "eagle.consumer",
        "fetchSize" : 1048586,
        "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer",
        "transactionZKServers" : "sandbox.hortonworks.com",
        "transactionZKPort" : 2181,
        "transactionZKRoot" : "/consumers",
        "transactionStateUpdateMS" : 2000
      },
      "alertExecutorConfigs" : {
         "cassandraQueryLogExecutor" : {
           "parallelism" : 1,
           "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
           "needValidation" : "true"
         }
      },
      "eagleProps" : {
        "site" : "sandbox",
        "application": "cassandraQueryLog",
       "dataJoinPollIntervalSec" : 30,
        "mailHost" : "mailHost.com",
        "mailSmtpPort":"25",
        "mailDebug" : "true",
        "balancePartitionEnabled" : true,
        #"partitionRefreshIntervalInMin" : 60,
        #"kafkaStatisticRangeInMin" : 60,
        "eagleService": {
          "host": "localhost",
          "port": 9099,
          "username": "admin",
          "password": "secret"
        }
        "readHdfsUserCommandPatternFrom" : "file"
      },
      "dynamicConfigSource" : {
       "enabled" : true,
       "initDelayMillis" : 0,
       "delayMillis" : 30000
      }
    }
    
    
  2. Start monitoring topology

    Code Block
    languagebash
    ./bin/kafka-stream-monitor.sh cassandraQueryLogStream cassandraQueryLogExecutor cassandra-querylog-sandbox.conf

5. Validate with Sample Policy

a. Define sample policy with eagle

Image AddedImage Added

Code Block
languagetext
from cassandraQueryLogStream[(ks == 'dg_keyspace') and (cf == 'customer_details')] select * insert into outputStream;

b. Trigger alert with sample event

 

Code Block
languagebash
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic cassandra_querylog_sandbox --broker-list sandbox.hortonworks.com:6667
 
{"host": "/192.168.6.227","source": "/192.168.6.227","user": "jaspa",    "timestamp": 1455574202864, "category": "QUERY",    "type": "CQL_SELECT",    "ks": "dg_keyspace","cf": "customer_details","operation": "CQL_SELECT","masked_columns": "bank|ccno|email|ip|name|sal|ssn|tel|url","other_columns": "id|npi"}

c. Review generated alert

Image AddedImage Added