In this blog post we will walk through what it takes to setup a new telemetry source in Metron. For this example we will setup a new sensor, capture the sensor logs, pipe the logs to Kafka, pick up the logs with a Metron parsing topology, parse them, and run them through the Metron stream processing pipeline.
Our example sensor will be a Squid Proxy. Squid is a caching proxy for the Web supporting HTTP, HTTPS, FTP, and more. Squid logs are simple to explain and easy to parse and the velocity of traffic coming from Squid is representative of a a typical network-based sensor. Hence, we feel it's a good telemetry to use for this tutorial.
Prior to going through this tutorial make sure you have Metron properly installed. Please see here for Metron installation and validation instructions. We will be using a single VM setup for this exercise. To setup the VM do the following steps:
cd metron-deployment/vagrant/singlenode-vagrant vagrant plugin install vagrant-hostmanager vagrant upvagrant ssh
After executing the above commands a Metron VM will be build (called node1) and you will be logged in as user vagrant. Now lets install the Squid sensor.
sudo yum install squid
sudo service squid start
This will run through the install and the Squid sensor will be installed and started. Now lets look at Squid logs.
sudo su -
cd /var/log/squid
ls
You see that there are three types of logs available: access.log, cache.log, and squid.out. We are interested in access.log as that is the log that records the proxy usage. We see that initially the log is empty. Lets generate a few entries for the log.
squidclient http://www.cnn.com
squidclient http://www.nba.com
vi /var/log/squid/access.log
In production environments you would configure your users web browsers to point to the proxy server, but for the sake of simplicity of this tutorial we will use the client that is packaged with the Squid installation After we use the client to simulate proxy requests the Squid log entries would look as follows:
1461576382.642 161 127.0.0.1 TCP_MISS/200 103701 GET http://www.cnn.com/ - DIRECT/199.27.79.73 text/html
1461576442.228 159 127.0.0.1 TCP_MISS/200 137183 GET http://www.nba.com/ - DIRECT/66.210.41.9 text/html
The format of the log is timestamp | time elapsed | remotehost | code/status | bytes | method | URL rfc931 peerstatus/peerhost | type
Now that we have the sensor set up and generating logs we need to figure out how to pipe these logs to a Kafka topic. To do so the first thing we need to do is setup a new Kafka topic for Squid.
cd /usr/hdp/current/kafka-broker/bin/
./kafka-topics.sh --zookeeper localhost:2181 --create --topic squid --partitions 1 --replication-factor 1
./kafka-topics.sh --zookeeper localhost:2181 --list
The following commands will setup a new Kafka topic for squid. Now let's test how we can pipe the Squid logs to Kakfka
tail /var/log/squid/access.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic squid
./kafka-console-consumer.sh --zookeeper node1:2181 --topic squid --from-beginning
This should ingest our Squid logs into Kafka. Now we are ready to tackle the Metron parsing topology setup. The first thing we need to do is decide if we will be using the Java-based parser of a Grok-based parser for the new telemetry. In this example we will be using the Grok parser. Grok parser is perfect for structured or semi-structured logs that are well understood (check) and telemetries with lower volumes of traffic (check). The first thing we need to do is define the Grok expression for our log. Refer to Grok documentation for additional details. In our case the pattern is:
WDOM [^(?:http:\/\/|www\.|https:\/\/)]([^\/]+)
SQUID_DELIMITED %{NUMBER:timestamp} %{SPACE:UNWANTED} %{INT:elapsed} %{IPV4:ip_src_addr} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} http:\/\/\www.%{WDOM:url}\/ - %{WORD:UNWATED}\/%{IPV4:ip_dst_addr} %{WORD:UNWATED}\/%{WORD:UNWATED}
Notice that I define a WDOM pattern (that is more tailored to Squid instead of using the generic Grok URL pattern) before defining the Squid log pattern. This is optional and is done for ease of use. Also, notice that I apply the UNWANTED tag for any part of the message that I don't want included in my resulting JSON structure. Finally, notice that I applied the naming convention to the IPV4 field by referencing the following list of field conventions. The last thing I need to do is to validate my Grok pattern to make sure it's valid. For our test we will be using a free Grok validator called Grok Constructor. A validated Grok expression should look like this:
Now that the Grok pattern has been defined we need to save it and move it to HDFS. Existing Grok parsers that ship with Metron are staged under /apps/metron/patterns/
[root@node1 bin]# hdfs dfs -ls /apps/metron/patterns/
Found 5 items
-rw-r--r-- 3 hdfs hadoop 13427 2016-04-25 07:07 /apps/metron/patterns/asa
-rw-r--r-- 3 hdfs hadoop 5203 2016-04-25 07:07 /apps/metron/patterns/common
-rw-r--r-- 3 hdfs hadoop 524 2016-04-25 07:07 /apps/metron/patterns/fireeye
-rw-r--r-- 3 hdfs hadoop 2552 2016-04-25 07:07 /apps/metron/patterns/sourcefire
-rw-r--r-- 3 hdfs hadoop 879 2016-04-25 07:07 /apps/metron/patterns/yaf
We need to move our new Squid pattern into the same directory. Create a file from the grok pattern above:
touch /tmp/squid
vi /tmp/squid
Then move it to HDFS:
su - hdfs
hdfs dfs -put /tmp/squid /apps/metron/patterns/
exit
Now that the Grok pattern is staged in HDFS we need to define Storm Flux configuration for the Metron Parsing Topology. The configs are staged under
/usr/metron/0.1BETA/config/topologies/ and each parsing topology has it's own set of configs. Each directory for a topology has a remote.yaml which is designed to be run on AWS and local/test.yaml designed to run locally on a single-node VM. At the moment of publishing this blog entry the following configs are available:
/usr/metron/0.1BETA/flux/test.yaml
/usr/metron/0.1BETA/flux/remote.yaml
/usr/metron/0.1BETA/flux/sourcefire/test.yaml
/usr/metron/0.1BETA/flux/sourcefire/remote.yaml
/usr/metron/0.1BETA/flux/asa/test.yaml
/usr/metron/0.1BETA/flux/asa/remote.yaml
/usr/metron/0.1BETA/flux/fireeye/test.yaml
/usr/metron/0.1BETA/flux/fireeye/remote.yaml
/usr/metron/0.1BETA/flux/bro/test.yaml
/usr/metron/0.1BETA/flux/bro/remote.yaml
/usr/metron/0.1BETA/flux/ise/test.yaml
/usr/metron/0.1BETA/flux/ise/remote.yaml
/usr/metron/0.1BETA/flux/paloalto/test.yaml
/usr/metron/0.1BETA/flux/paloalto/remote.yaml
/usr/metron/0.1BETA/flux/lancope/test.yaml
/usr/metron/0.1BETA/flux/lancope/remote.yaml
/usr/metron/0.1BETA/flux/pcap/test.yaml
/usr/metron/0.1BETA/flux/pcap/remote.yaml
/usr/metron/0.1BETA/flux/enrichment/test.yaml
/usr/metron/0.1BETA/flux/enrichment/remote.yaml
/usr/metron/0.1BETA/flux/snort/test.yaml
/usr/metron/0.1BETA/flux/snort/remote.yaml
Since we are going to be running locally on a VM we need to define a test.yaml for Squid. The easiest way to do this is to copy one of the existing Grok-based configs (YAF) and tailor it for Squid.
mkdir /usr/metron/0.1BETA/flux/squid
cp /usr/metron/0.1BETA/flux/yaf/remote.yaml /usr/metron/0.1BETA/flux/squid/remote.yaml
vi /usr/metron/0.1BETA/flux/squid/remote.yaml
And edit your config to look like this (changes highlighted in red):
name: "squid"
config:
topology.workers: 1
components:
- id: "parser"
className: "org.apache.metron.parsing.parsers.GrokParser"
constructorArgs:
- "/apps/metron/patterns/squid"
- "SQUID_DELIMITED"
configMethods:
- name: "withTimestampField"
args:
- "start_time"
- name: "withMetronHDFSHome"
args:
- ""
- id: "writer"
className: "org.apache.metron.writer.KafkaWriter"
constructorArgs:
- "${kafka.broker}"
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
- "${kafka.zk}"
- id: "kafkaConfig"
className: "storm.kafka.SpoutConfig"
constructorArgs:
# zookeeper hosts
- ref: "zkHosts"
# topic name
- "squid"
# zk root
- ""
# id
- "squid"
properties:
- name: "ignoreZkOffsets"
value: false
- name: "startOffsetTime"
value: -1
- name: "socketTimeoutMs"
value: 1000000
spouts:
- id: "kafkaSpout"
className: "storm.kafka.KafkaSpout"
constructorArgs:
- ref: "kafkaConfig"
bolts:
- id: "parserBolt"
className: "org.apache.metron.bolt.ParserBolt"
constructorArgs:
- "${kafka.zk}"
- "squid"
- ref: "parser"
- ref: "writer"
streams:
- name: "spout -> bolt"
from: "kafkaSpout"
to: "parserBolt"
grouping:
type: SHUFFLE
Start the new squid parser topology:
storm jar /usr/metron/0.1BETA/lib/metron-parsers-0.1BETA.jar org.apache.storm.flux.Flux --filter /usr/metron/0.1BETA/config/elasticsearch.properties --remote /usr/metron/0.1BETA/flux/squid/remote.yaml
Navigate to the squid parser topology in the Storm UI at http://node1:8744/index.html and verify the topology is up with no errors:
Now that we have a new running squid parser topology, generate some data to parse by running this command several times:
tail /var/log/squid/access.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic squid
Refresh the Storm UI and it should report data being parsed:
Then navigate Elasticsearch at http://node1:9200/_cat/indices?v and verify that a squid index has been created:
health status index pri rep docs.count docs.deleted store.size pri.store.size
yellow open yaf_index_2016.04.25.15 5 1 5485 0 4mb 4mb
yellow open snort_index_2016.04.26.12 5 1 24452 0 14.4mb 14.4mb
yellow open bro_index_2016.04.25.16 5 1 1295 0 1.9mb 1.9mb
yellow open squid_index_2016.04.26.13 5 1 1 0 7.3kb 7.3kb
yellow open yaf_index_2016.04.25.17 5 1 30750 0 17.4mb 17.4mb