You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Stream Topology Definition DSL

EAGLE-98

 

// #!/bin/bash
// exec scala "$0" "$@"
// !#
// # start
define ("metricStream_1") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from
  kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="")

define ("metricStream_2") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from
  kafka(topic="metricStream_2")

define ("logStream_3") from kafka(topic="logStream_3")

// filter by function
filter ("logStream_3") by {(line,collector) => collector.collect(line)} as ("name" -> 'string, "value"->'double, "timestamp"->'long)
// "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long)

// filter by pattern and rename stream
filter("logStream_3"->"logStream_3_parsed") by """(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string, "value"->'double, "timestamp"-> datetime("YYYY-MM-DD"))

alert partitionBy "metricStream_1.metricType" parallism 1 by {sql"""
  from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
  select sum(value) group by host output every 1 hour insert into alertStream;
"""}

aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql"""
  from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
  select sum(value) group by host output every 1 hour insert into aggregatedMetricStream_1;
"""}

'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
"alertStream" to mail(
  from = "sender@eagle.incubator.apache.org",
  to = "receiver@eagle.incubator.apache.org",
  smtp = "localhost:25",
  template =
    <html>
      <head>
      <title>Alert Notification</title>
      </head>
      <body>
        <h1>Message</h1>
        <p>$message</p>
      </body>
    </html>
)

// split stream by logic
'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component == 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn")  where "component == 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
// # end

 

Dynamic Topology Management

Dynamic Topology Manager

  • No labels