Table of Contents |
---|
Problem
If user wants to use Eagle for general metric/log-based monitoring like hadoop jmx monitoring, user has to setup-up development environment, rewrite code though we provided high-level fluent DSL and re-build, package and submit the topology manually, which is not good for user experience nor re-use eagle's existing capability for more monitoring cases, the better way is to allow user to dynamically define monitoring topology in very light-weight way, so we propose a dynamical monitoring framework.
Features
The dynamical Monitoring Framework is consist of four parts:
- Pipeline Builder: Declarative Pipeline Modeling, DSL, Parser and Compiler
- Pipeline Runner: Provide tool/CLI to read pipeline definition, parse, compile and submit pipeline
- Pipeline Scheduler: Automatically pipeline scheduler
- Pipeline Designer: Graphic-baed Pipeline designer
- Compile DSL Configure to Pipeline model
...
- Compile Pipeline model to Stream Execution Graph
...
- Submit Stream Execution Graph to actual running environment say storm
...
- Support Alert and Persistence for metric monitoring
...
- Extensible stream module management
...
- Automatically scan and register module
...
- Pipeline runner CLI tool and shell script
...
- Decouple pipeline compiler and scheduler into individual modules
...
- Stream Pipeline Scheduler
...
- Graph editor to define streaming graph in UI
...
- JSON/Config & Scala Case Class Mapping (https://github.com/scala/pickling)
...
- Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
...
- Provide stream schema inline and send to metadata when submitting
...
- UI should support specify executorId when defining new stream
...
- Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
...
- Fix multi-configure bug
...
- Fix configuration conflict
...
Components
Dynamical Pipeline Builder
Pipeline ::= config + schema + dataflow
Configuration based DSL
Tickets
Definition
Code Block |
---|
pipeline {
context {
// pipeline configuraiton
}
schema{
${schemaName} {
${field} : ${type}
}
}
dataflow{
// 1. DAG Node definition
${ModuleType}.${streamId} {
// module confiruation
}
// 2. DAG Connector definition
${streamId} -> ${streamId} {
// Connection configuration like grouping = "shuffle"
}
}
} |
Example
Gliffy Diagram name Dynamic Topology Example
Code Block | ||||
---|---|---|---|---|
| ||||
{
config {
alertExecutorConfigs {
defaultAlertExecutor {
"parallelism" : 1
"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
"needValidation" : "true"
}
}
eagleProps {
"site" : "sandbox"
"dataSource": "HADOOP"
"dataJoinPollIntervalSec" : 30
"mailHost" : "atom.corp.ebay.com"
"mailSmtpPort":"25"
"mailDebug" : "true"
"eagleService": {
"host": "localhost"
"port": 38080
"username": "admin"
"password": "secret"
}
}
dynamicConfigSource {
"enabled" : true
"initDelayMillis" : 0
"delayMillis" : 30000
}
}
dataflow {
KafkaSource.JmxStreamOne {
parallism = 1000
topic = "metric_event_1"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
KafkaSource.JmxStreamTwo {
parallism = 1000
topic = "metric_event_2"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
KafkaSource.JmxStreamThree{
parallism = 1000
topic = "metric_event_3"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
Console.printer {}
KafkaSink.metricStore {
topic = "metric_event_persist"
}
// KafkaSink.alertStore {
// "topic" = "alert_persist"
// "bootstrap.servers" = "localhost:6667"
// }
Alert.alerter {
upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
alertExecutorId = defaultAlertExecutor
}
// Aggregator.aggreator {
// upStreamNames = []
// analyzerId = ""
// cepQl = ""
// strategy = ""
// }
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alerter {
grouping = shuffle
}
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
grouping = shuffle
}
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
grouping = shuffle
}
}
} |
Finally compile to follow DAG in runtime.
Dynamical Pipeline Runner
- Pipeline Runner CLI
...
Dynamical Pipeline Scheduler
Stream Topology Definition DSL
Code Block | ||
---|---|---|
| ||
init[storm](args)
// =====================================
"logStream" := stream from Seq(
"55.3.244.1 GET /index.html 15824 0.043",
"55.3.244.1 GET /index.html 15824 0.043",
"55.3.244.1 GET /index.html 15824 0.043",
"55.3.244.1 GET /index.html 15824 0.043",
"55.3.244.1 GET /index.html 15824 0.043",
"55.3.244.1 GET /index.html 15824 0.043"
) as ("line"->'string) parallism 10
"parserStream" := $"logStream" grok {
pattern("line"->"(?<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)\\s+(?<method>\\w+)\\s+(?<path>[\\w/\\.]+)\\s+(?<bytes>\\d+)\\s+(?<time>[\\d\\.]+)")
}
$"parserStream" > stdout parallism 1
// =====================================
submit |
...
Gliffy Diagram | ||||
---|---|---|---|---|
|
Dynamical Pipeline Designer
Requirements
- User could define the monitoring DAG pipeline in UI by DSL
- User could define the monitoring DAG pipeline in UI with UI actions
Use Case
Hadoop JMX Metric Monitoring
Gliffy Diagram | ||||
---|---|---|---|---|
|