THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Stream Topology Definition DSL
// #!/bin/bash // exec scala "$0" "$@" // !# // # start define ("metricStream_1") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="") define ("metricStream_2") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from kafka(topic="metricStream_2") define ("logStream_3") from kafka(topic="logStream_3") // filter by function filter ("logStream_3") by {(line,collector) => collector.collect(line)} as ("name" -> 'string, "value"->'double, "timestamp"->'long) // "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long) // filter by pattern and rename stream filter("logStream_3"->"logStream_3_parsed") by """(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string, "value"->'double, "timestamp"-> datetime("YYYY-MM-DD")) alert partitionBy "metricStream_1.metricType" parallism 1 by {sql""" from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600] select sum(value) group by host output every 1 hour insert into alertStream; """} aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql""" from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600] select sum(value) group by host output every 1 hour insert into aggregatedMetricStream_1; """} 'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts") "alertStream" to mail( from = "sender@eagle.incubator.apache.org", to = "receiver@eagle.incubator.apache.org", smtp = "localhost:25", template = <html> <head> <title>Alert Notification</title> </head> <body> <h1>Message</h1> <p>$message</p> </body> </html> ) // split stream by logic 'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component == 'dn'" partitionBy "aggregatedMetricStream_1.metricType" 'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn") where "component == 'nn'" partitionBy "aggregatedMetricStream_1.metricType" // # end
Dynamic Topology Management