You are viewing an old version of this page. View the current version.
Compare with Current
View Page History
« Previous
Version 16
Next »
Dynamical Monitoring Framework is consist of four parts:
- Pipeline Builder: Dynamical Pipeline Modeling, DSL, Parser and Compiler
- Pipeline Runner: Provide tool/CLI to read pipeline definition, parse, compile and submit pipeline
- Pipeline Manager: Automatically pipeline scheduler
- Pipeline Designer: Graphic-baed Pipeline designer
Features
- Compile DSL Configure to Pipeline model
- Compile Pipeline model to Stream Execution Graph
- Submit Stream Execution Graph to actual running environment say storm
- Support Alert and Persistence for metric monitoring
- Extensible stream module management
- Automatically scan and register module
- Pipeline runner CLI tool and shell script
- Decouple pipeline compiler and scheduler into individual modules
- Stream Pipeline Scheduler
- Graph editor to define streaming graph in UI
- Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
- Provide stream schema inline and send to metadata when submitting
- UI should support specify executorId when defining new stream
- Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
- Fix configuration conflict
Components
Dynamical Pipeline Builder
Pipeline ::= config + schema + dataflow
Configuration based DSL
Tickets
Definition
pipeline {
config {
// pipeline configuraiton
}
schema{
${schemaName} {
${field} : ${type}
}
}
dataflow{
// 1. DAG Node definition
${ModuleType}.${streamId} {
// module confiruation
}
// 2. DAG Connector definition
${streamId} -> ${streamId} {
// Connection configuration like grouping = "shuffle"
}
}
}
Example
{
config {
alertExecutorConfigs {
defaultAlertExecutor {
"parallelism" : 1
"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
"needValidation" : "true"
}
}
eagleProps {
"site" : "sandbox"
"dataSource": "HADOOP"
"dataJoinPollIntervalSec" : 30
"mailHost" : "atom.corp.ebay.com"
"mailSmtpPort":"25"
"mailDebug" : "true"
"eagleService": {
"host": "localhost"
"port": 38080
"username": "admin"
"password": "secret"
}
}
dynamicConfigSource {
"enabled" : true
"initDelayMillis" : 0
"delayMillis" : 30000
}
}
dataflow {
KafkaSource.JmxStreamOne {
parallism = 1000
topic = "metric_event_1"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
KafkaSource.JmxStreamTwo {
parallism = 1000
topic = "metric_event_2"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
KafkaSource.JmxStreamThree{
parallism = 1000
topic = "metric_event_3"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
Console.printer {}
KafkaSink.metricStore {
topic = "metric_event_persist"
}
// KafkaSink.alertStore {
// "topic" = "alert_persist"
// "bootstrap.servers" = "localhost:6667"
// }
Alert.alerter {
upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
alertExecutorId = defaultAlertExecutor
}
// Aggregator.aggreator {
// upStreamNames = []
// analyzerId = ""
// cepQl = ""
// strategy = ""
// }
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alerter {
grouping = shuffle
}
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
grouping = shuffle
}
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
grouping = shuffle
}
}
}
Finally compile to follow DAG in runtime.
![](/confluence/download/attachments/61334083/Screen%20Shot%202016-01-17%20at%206.36.10%20PM.png?version=1&modificationDate=1453027019000&api=v2)
Dynamical Pipeline Runner
Dynamical Pipeline Manager
Dynamical Pipeline Designer
Requirements
- User could define the monitoring DAG pipeline in UI by DSL
- User could define the monitoring DAG pipeline in UI with UI actions
Use Case
Hadoop JMX Metric Monitoring