You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Dynamical Monitoring Framework is consist of four parts: 

  • Pipeline Builder:  Dynamical Pipeline Modeling, DSL, Parser and Compiler
  • Pipeline Runner:  Provide tool/CLI to read pipeline definition, parse, compile and submit pipeline
  • Pipeline Manager: Automatically pipeline scheduler
  • Pipeline Designer: Graphic-baed Pipeline designer

Dynamical Pipeline Builder

Pipeline ::= config + schema + dataflow

Configuration based DSL

Tickets

Definition

{
	config {
		// pipeline configuraiton
	}
	schema{
		${schemaName} {
			${field} : ${type}
		}
	}
	dataflow{
		// 1. DAG Node definition
		${ModuleType}.${streamId} {
			// module confiruation
		}		
 
		// 2. DAG Connector definition
		${streamId} -> ${streamId} {
			// Connection configuration like grouping = "shuffle"
		}
	}
}

Example

{
 config {
  alertExecutorConfigs {
   defaultAlertExecutor  {
    "parallelism" : 1
    "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
    "needValidation" : "true"
   }
  }
  eagleProps  {
   "site" : "sandbox"
   "dataSource": "HADOOP"
   "dataJoinPollIntervalSec" : 30
   "mailHost" : "atom.corp.ebay.com"
   "mailSmtpPort":"25"
   "mailDebug" : "true"
   "eagleService": {
    "host": "localhost"
    "port": 38080
    "username": "admin"
    "password": "secret"
   }
  }
  dynamicConfigSource  {
   "enabled" : true
   "initDelayMillis" : 0
   "delayMillis" : 30000
  }
 }

 dataflow {
  KafkaSource.JmxStreamOne {
   parallism = 1000
   topic = "metric_event_1"
   zkConnection = "localhost:2181"
   zkConnectionTimeoutMS = 15000
   consumerGroupId = "Consumer"
   fetchSize = 1048586
   transactionZKServers = "localhost"
   transactionZKPort = 2181
   transactionZKRoot = "/consumers"
   transactionStateUpdateMS = 2000
   deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
  }

  KafkaSource.JmxStreamTwo {
   parallism = 1000
   topic = "metric_event_2"
   zkConnection = "localhost:2181"
   zkConnectionTimeoutMS = 15000
   consumerGroupId = "Consumer"
   fetchSize = 1048586
   transactionZKServers = "localhost"
   transactionZKPort = 2181
   transactionZKRoot = "/consumers"
   transactionStateUpdateMS = 2000
   deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
  }

  KafkaSource.JmxStreamThree{
   parallism = 1000
   topic = "metric_event_3"
   zkConnection = "localhost:2181"
   zkConnectionTimeoutMS = 15000
   consumerGroupId = "Consumer"
   fetchSize = 1048586
   transactionZKServers = "localhost"
   transactionZKPort = 2181
   transactionZKRoot = "/consumers"
   transactionStateUpdateMS = 2000
   deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
  }

  Console.printer {}

  KafkaSink.metricStore {
   topic = "metric_event_persist"
  }

//  KafkaSink.alertStore {
//   "topic" = "alert_persist"
//   "bootstrap.servers" = "localhost:6667"
//  }

  Alert.alerter {
   upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
   alertExecutorId = defaultAlertExecutor
  }

//  Aggregator.aggreator {
//   upStreamNames = []
//   analyzerId = ""
//   cepQl = ""
//   strategy = ""
//  }

  JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alerter {
   grouping = shuffle
  }

  JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
   grouping = shuffle
  }

  JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
   grouping = shuffle
  }
 }
}

Finally compile to follow DAG in runtime.

 

Dynamic Pipeline Runner

  • Pipeline Runner CLI

Dynamic Pipeline Manager

Dynamic Topology Manager

 

Use Case

Hadoop JMX Metric Monitoring

jmx monitoring


 

  • No labels