Table of Contents |
---|
Problem
If user wants to use Eagle for general metric/log-based monitoring like hadoop jmx monitoring, user has to setup-up development environment, rewrite code though we provided high-level fluent DSL and re-build, package and submit the topology manually, which is not good for user experience nor re-use eagle's existing capability for more monitoring cases, the better way is to allow user to dynamically define monitoring topology in very light-weight way, so we propose a dynamical monitoring framework.
Features
The dynamical Monitoring Framework is consist of four parts:
- Pipeline Builder: Declarative Pipeline Modeling, DSL, Parser and Compiler
- Pipeline Runner: Provide tool/CLI to read pipeline definition, parse, compile and submit pipeline
- Pipeline Scheduler: Automatically pipeline scheduler
- Pipeline Designer: Graphic-baed Pipeline designer
- Compile DSL Configure to Pipeline model
...
- Compile Pipeline model to Stream Execution Graph
...
- Submit Stream Execution Graph to actual running environment say storm
...
- Support Alert and Persistence for metric monitoring
...
- Extensible stream module management
...
- Automatically scan and register module
...
- Pipeline runner CLI tool and shell script
...
- Decouple pipeline compiler and scheduler into individual modules
...
- Stream Pipeline Scheduler
...
- Graph editor to define streaming graph in UI
...
- JSON/Config & Scala Case Class Mapping (https://github.com/scala/pickling)
...
- Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
...
- Provide stream schema inline and send to metadata when submitting
...
- UI should support specify executorId when defining new stream
...
- Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
...
- Fix multi-configure bug
...
- Fix configuration conflict
...
Components
Dynamical Pipeline Builder
Pipeline ::= config + schema + dataflow
Configuration based DSL
Tickets
Definition
Code Block |
---|
pipeline {
context {
// pipeline configuraiton
}
schema{
${schemaName} {
${field} : ${type}
}
}
dataflow{
// 1. DAG Node definition
${ModuleType}.${streamId} {
// module confiruation
}
// 2. DAG Connector definition
${streamId} -> ${streamId} {
// Connection configuration like grouping = "shuffle"
}
}
} |
Example
Gliffy Diagram name Dynamic Topology Example
Code Block | ||||
---|---|---|---|---|
| ||||
{
config {
alertExecutorConfigs {
defaultAlertExecutor {
"parallelism" : 1
"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
"needValidation" : "true"
}
}
eagleProps {
"site" : "sandbox"
"dataSource": "HADOOP"
"dataJoinPollIntervalSec" : 30
"mailHost" : "atom.corp.ebay.com"
"mailSmtpPort":"25"
"mailDebug" : "true"
"eagleService": {
"host": "localhost"
"port": 38080
"username": "admin"
"password": "secret"
}
}
dynamicConfigSource {
"enabled" : true
"initDelayMillis" : 0
"delayMillis" : 30000
}
}
dataflow {
KafkaSource.JmxStreamOne {
parallism = 1000
topic = "metric_event_1"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
KafkaSource.JmxStreamTwo {
parallism = 1000
topic = "metric_event_2"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
KafkaSource.JmxStreamThree{
parallism = 1000
topic = "metric_event_3"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
Console.printer {}
KafkaSink.metricStore {
topic = "metric_event_persist"
}
// KafkaSink.alertStore {
// "topic" = "alert_persist"
// "bootstrap.servers" = "localhost:6667"
// }
Alert.alerter {
upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
alertExecutorId = defaultAlertExecutor
}
// Aggregator.aggreator {
// upStreamNames = []
// analyzerId = ""
// cepQl = ""
// strategy = ""
// }
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alerter {
grouping = shuffle
}
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
grouping = shuffle
}
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
grouping = shuffle
}
}
} |
Finally compile to follow DAG in runtime.
Dynamical Pipeline Runner
- Pipeline Runner CLI
...
Dynamical Pipeline Scheduler
Gliffy Diagram | ||||
---|---|---|---|---|
|
Dynamical Pipeline Designer
Requirements
- User could define the monitoring DAG pipeline in UI by DSL
- User could define the monitoring DAG pipeline in UI with UI actions
Use Case
Hadoop JMX Metric Monitoring
Gliffy Diagram | ||||
---|---|---|---|---|
|
Stream Topology Definition DSL
Code Block | ||
---|---|---|
| ||
// #!/bin/bash
// exec scala "$0" "$@"
// !#
// # start
define ("metricStream_1") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from
kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="")
define ("metricStream_2") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from
kafka(topic="metricStream_2")
define ("logStream_3") from kafka(topic="logStream_3")
// filter by function
filter ("logStream_3") by {(line,collector) => collector.collect(line)} as ("name" -> 'string, "value"->'double, "timestamp"->'long)
// "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long)
// filter by pattern and rename stream
filter("logStream_3"->"logStream_3_parsed") by """(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string, "value"->'double, "timestamp"-> datetime("YYYY-MM-DD"))
alert partitionBy "metricStream_1.metricType" parallism 1 by {sql"""
from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into alertStream;
"""}
aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql"""
from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into aggregatedMetricStream_1;
"""}
'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
"alertStream" to mail(
from = "sender@eagle.incubator.apache.org",
to = "receiver@eagle.incubator.apache.org",
smtp = "localhost:25",
template =
<html>
<head>
<title>Alert Notification</title>
</head>
<body>
<h1>Message</h1>
<p>$message</p>
</body>
</html>
)
// split stream by logic
'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component == 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn") where "component == 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
// # end |
Dynamic Topology Management
Gliffy Diagram