You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Dynamical Monitoring Framework is consist of four parts: 

  • Pipeline Builder:  Dynamical Pipeline Modeling, DSL, Parser and Compiler
  • Pipeline Runner:  Provide tool/CLI to read pipeline definition, parse, compile and submit pipeline
  • Pipeline Manager: Automatically pipeline scheduler
  • Pipeline Designer: Graphic-baed Pipeline designer

Features 

  • Compile DSL Configure to Pipeline model
  • Compile Pipeline model to Stream Execution Graph
  • Submit Stream Execution Graph to actual running environment say storm
  • Support Alert and Persistence for metric monitoring
  •  Extensible stream module management
  • Automatically scan and register module
  • Pipeline runner CLI tool and shell script
  • Decouple pipeline compiler and scheduler into individual modules
  • Stream Pipeline Scheduler
  • Graph editor to define streaming graph in UI
  • Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
  • Provide stream schema inline and send to metadata when submitting
  • UI should support specify executorId when defining new stream
  • Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
  • Fix multi-configure bug
  • Fix configuration conflict

Components

Dynamical Pipeline Builder

Pipeline ::= config + schema + dataflow

Configuration based DSL

Tickets

Definition

pipeline {
	context {
		// pipeline configuraiton
	}
	schema{
		${schemaName} {
			${field} : ${type}
		}
	}
	dataflow{
		// 1. DAG Node definition
		${ModuleType}.${streamId} {
			// module confiruation
		}		
 
		// 2. DAG Connector definition
		${streamId} -> ${streamId} {
			// Connection configuration like grouping = "shuffle"
		}
	}
}

Example

{
 config {
  alertExecutorConfigs {
   defaultAlertExecutor  {
    "parallelism" : 1
    "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
    "needValidation" : "true"
   }
  }
  eagleProps  {
   "site" : "sandbox"
   "dataSource": "HADOOP"
   "dataJoinPollIntervalSec" : 30
   "mailHost" : "atom.corp.ebay.com"
   "mailSmtpPort":"25"
   "mailDebug" : "true"
   "eagleService": {
    "host": "localhost"
    "port": 38080
    "username": "admin"
    "password": "secret"
   }
  }
  dynamicConfigSource  {
   "enabled" : true
   "initDelayMillis" : 0
   "delayMillis" : 30000
  }
 }

 dataflow {
  KafkaSource.JmxStreamOne {
   parallism = 1000
   topic = "metric_event_1"
   zkConnection = "localhost:2181"
   zkConnectionTimeoutMS = 15000
   consumerGroupId = "Consumer"
   fetchSize = 1048586
   transactionZKServers = "localhost"
   transactionZKPort = 2181
   transactionZKRoot = "/consumers"
   transactionStateUpdateMS = 2000
   deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
  }

  KafkaSource.JmxStreamTwo {
   parallism = 1000
   topic = "metric_event_2"
   zkConnection = "localhost:2181"
   zkConnectionTimeoutMS = 15000
   consumerGroupId = "Consumer"
   fetchSize = 1048586
   transactionZKServers = "localhost"
   transactionZKPort = 2181
   transactionZKRoot = "/consumers"
   transactionStateUpdateMS = 2000
   deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
  }

  KafkaSource.JmxStreamThree{
   parallism = 1000
   topic = "metric_event_3"
   zkConnection = "localhost:2181"
   zkConnectionTimeoutMS = 15000
   consumerGroupId = "Consumer"
   fetchSize = 1048586
   transactionZKServers = "localhost"
   transactionZKPort = 2181
   transactionZKRoot = "/consumers"
   transactionStateUpdateMS = 2000
   deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
  }

  Console.printer {}

  KafkaSink.metricStore {
   topic = "metric_event_persist"
  }

//  KafkaSink.alertStore {
//   "topic" = "alert_persist"
//   "bootstrap.servers" = "localhost:6667"
//  }

  Alert.alerter {
   upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
   alertExecutorId = defaultAlertExecutor
  }

//  Aggregator.aggreator {
//   upStreamNames = []
//   analyzerId = ""
//   cepQl = ""
//   strategy = ""
//  }

  JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alerter {
   grouping = shuffle
  }

  JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
   grouping = shuffle
  }

  JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
   grouping = shuffle
  }
 }
}

Finally compile to follow DAG in runtime.

 

Dynamical Pipeline Runner

  • Pipeline Runner CLI

Dynamical Pipeline Manager

Dynamic Topology Manager

 

Dynamical Pipeline Designer

Requirements

  • User could define the monitoring DAG pipeline in UI by DSL
  • User could define the monitoring DAG pipeline in UI with UI actions 

Use Case

Hadoop JMX Metric Monitoring

jmx monitoring


 

  • No labels