Table of Contents |
---|
Problem
If user wants to use Eagle for general metric/log-based monitoring like hadoop jmx monitoring, user has to setup-up development environment, rewrite code though we provided high-level fluent DSL and re-build, package and submit the topology manually, which is not good for user experience nor re-use eagle's existing capability for more monitoring cases, the better way is to allow user to dynamically define monitoring topology in very light-weight way, so we propose a dynamical monitoring framework.
Features
The dynamical Dynamical Monitoring Framework is consist of four parts:
- Pipeline Builder: Declarative Pipeline Modeling, DSL, Parser and Compiler
- Pipeline Runner
- Pipeline Manager
- Pipeline Designer
Dynamical Pipeline Builder
...
- : Provide tool/CLI to read pipeline definition, parse, compile and submit pipeline
- Pipeline Scheduler: Automatically pipeline scheduler
- Pipeline Designer: Graphic-baed Pipeline designer
- Compile DSL Configure to Pipeline model
...
- Compile Pipeline model to Stream Execution Graph
...
- Submit Stream Execution Graph to actual running environment say storm
...
- Support Alert and Persistence for metric monitoring
...
- Extensible stream module management
...
- Automatically scan and register module
...
- Pipeline runner CLI tool and shell script
...
- Decouple pipeline compiler and scheduler into individual modules
...
- Stream Pipeline Scheduler
...
- Graph editor to define streaming graph in UI
...
- JSON/Config & Scala Case Class Mapping (https://github.com/scala/pickling)
...
- Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
...
- Provide stream schema inline and send to metadata when submitting
...
- UI should support specify executorId when defining new stream
...
- Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
...
- Fix multi-configure bug
...
- Fix configuration conflict
...
Components
Dynamical Pipeline Builder
Pipeline ::= config + schema + dataflow
Configuration based DSL
Tickets
Definition
Code Block |
---|
pipeline { configcontext { // pipeline configuraiton } schema{ ${schemaName} { ${field} : ${type} } } dataflow{ // 1. DAG Node definition ${ModuleType}.${streamId} { // module confiruation } // 2. DAG Connector definition ${streamId} -> ${streamId} { // Connection configuration like grouping = "shuffle" } } } |
Example
Gliffy Diagram
:name Dynamic Topology Example
Code Block | ||||
---|---|---|---|---|
| ||||
{ config { alertExecutorConfigs { defaultAlertExecutor { "parallelism" : 1 "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" "needValidation" : "true" } } eagleProps { "site" : "sandbox" "dataSource": "HADOOP" "dataJoinPollIntervalSec" : 30 "mailHost" : "atom.corp.ebay.com" "mailSmtpPort":"25" "mailDebug" : "true" "eagleService": { "host": "localhost" "port": 38080 "username": "admin" "password": "secret" } } dynamicConfigSource { "enabled" : true "initDelayMillis" : 0 "delayMillis" : 30000 } } dataflow { KafkaSource.JmxStreamOne { parallism = 1000 topic = "metric_event_1" zkConnection = "localhost:2181" zkConnectionTimeoutMS = 15000 consumerGroupId = "Consumer" fetchSize = 1048586 transactionZKServers = "localhost" transactionZKPort = 2181 transactionZKRoot = "/consumers" transactionStateUpdateMS = 2000 deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" } KafkaSource.JmxStreamTwo { parallism = 1000 topic = "metric_event_2" zkConnection = "localhost:2181" zkConnectionTimeoutMS = 15000 consumerGroupId = "Consumer" fetchSize = 1048586 transactionZKServers = "localhost" transactionZKPort = 2181 transactionZKRoot = "/consumers" transactionStateUpdateMS = 2000 deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" } KafkaSource.JmxStreamThree{ parallism = 1000 topic = "metric_event_3" zkConnection = "localhost:2181" zkConnectionTimeoutMS = 15000 consumerGroupId = "Consumer" fetchSize = 1048586 transactionZKServers = "localhost" transactionZKPort = 2181 transactionZKRoot = "/consumers" transactionStateUpdateMS = 2000 deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" } Console.printer {} KafkaSink.metricStore { topic = "metric_event_persist" } // KafkaSink.alertStore { // "topic" = "alert_persist" // "bootstrap.servers" = "localhost:6667" // } Alert.alerter { upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree] alertExecutorId = defaultAlertExecutor } // Aggregator.aggreator { // upStreamNames = [] // analyzerId = "" // cepQl = "" // strategy = "" // } JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alerter { grouping = shuffle } JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore { grouping = shuffle } JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer { grouping = shuffle } } } |
Finally compile to follow DAG in runtime.
...
Dynamical Pipeline Runner
- Pipeline Runner CLI
...
Dynamical Pipeline
...
Scheduler
Gliffy Diagram | ||||
---|---|---|---|---|
|
Dynamical Pipeline Designer
Requirements
- User could define the monitoring DAG pipeline in UI by DSL
- User could define the monitoring DAG pipeline in UI with UI actions
Use Case
Hadoop JMX Metric Monitoring
...