You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

The Setup..

When adding a net new data source to Metron, the first step is to decide how to push the events from the new telemetry data source into Metron. You can use a number of data collection tools and that decision is decoupled from Metron. An excellent tool for pushing data into Metron is  Apache Nifi which this section will describe how to use. The second step is to configure Metron to parse the telemetry data source so that downstream processing can be done on it. In this article we will walk you through how to perform both of these steps.

In the section , we described the following set of requirements for Customer Foo who wanted to add the Squid telemetry data source Into Metron.

  1. The proxy events from Squid logs need to be ingested in real-time.
  2. The proxy logs must be parsed into a standardized JSON structure that Metron can understand.
  3. In real-time, the squid proxy event must be enriched so that the domain names are enriched with the IP information.
  4. In real-time, the IP within the proxy event must be checked for threat intel feeds.
  5. If there is a threat intel hit, an alert needs to be raised.
  6. The end user must be able to see the new telemetry events and the alerts from the new data source.
  7. All of these requirements will need to be implemented easily without writing any new Java code.

In this article, we will walk you through how to perform steps 1, 2, and 6.

How to Parse the Squid Telemetry Data Source to Metron

The following steps guide you through how to add this new telemetry.

Step 1: Spin Up Single Node Vagrant VM

  1. Spin up the the Metron Vagrant VM by follwoing the instructions in QuickStart

Step 2: Create a Kafka Topic for the New Data Source

Every data source whose events you are streaming into Metron must have its own Kafka topic. The ingestion tool of choice (for example, Apache Nifi) will push events into this Kafka topic.
  1. Create a Kafka topic called "squid" in the directory /usr/hdp/current/kafka-broker/bin/:
    cd /usr/hdp/current/kafka-broker/bin/
    ./kafka-topics.sh --zookeeper localhost:2181 --create --topic squid --partitions 1 --replication-factor 1 
  2. List all of the Kafka topics to ensure that the new topic exists:
  1. ./kafka-topics.sh --zookeeper localhost:2181 --list
    

You should see the following list of Kafka topics:

  • bro
  • enrichment
  • pcap
  • snort
  • squid
  • yaf

Step 3: Install Squid

  1. Install and start Squid:
    sudo yum install squid
    sudo service squid start
  2. With Squid started, look at the the different log files that get created:
    sudo su -
    cd /var/log/squid
    ls

    You see that there are three types of logs available: access.log, cache.log, and squid.out. We are interested in access.log becasuse that is the log that records the proxy usage.

  3. Initially the access.log is empty. Let's generate a few entries for the log, then list the new contents of the access.log:
    squidclient http://www.cnn.com
    squidclient http://www.nba.com
    cat /var/log/squid/access.log

    In production environments you would configure your users web browsers to point to the proxy server, but for the sake of simplicity of this tutorial we will use the client that is packaged with the Squid installation. After we use the client to simulate proxy requests, the Squid log entries should look as follows:

    1461576382.642    161 127.0.0.1 TCP_MISS/200 103701 GET http://www.cnn.com/ - DIRECT/199.27.79.73 text/html
    1461576442.228    159 127.0.0.1 TCP_MISS/200 137183 GET http://www.nba.com/ - DIRECT/66.210.41.9 text/html
  4. Using the Squid log entries, we can determine the format of the log entires which is:

    timestamp | time elapsed | remotehost | code/status | bytes | method | URL rfc931 peerstatus/peerhost | type

Step 4: Create a Grok Statement to Parse the Squid Telemetry Event

Now we are ready to tackle the Metron parsing topology setup.

  1. The first thing we need to do is decide if we will be using the Java-based parser or the Grok-based parser for the new telemetry. In this example we will be using the Grok parser. Grok parser is perfect for structured or semi-structured logs that are well understood (check) and telemetries with lower volumes of traffic (check).
  2. Next we need to define the Grok expression for our log. Refer to Grok documentation for additional details. In our case the pattern is:

    WDOM [^(?:http:\/\/|www\.|https:\/\/)]([^\/]+) SQUID_DELIMITED %{NUMBER:timestamp} %{SPACE:UNWANTED} %{INT:elapsed} %{IPV4:ip_src_addr} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} http:\/\/\www.%{WDOM:url}\/ - %{WORD:UNWANTED}\/%{IPV4:ip_dst_addr} %{WORD:UNWANTED}\/%{WORD:UNWANTED}

    Notice the WDOM pattern (that is more tailored to Squid instead of using the generic Grok URL pattern) before defining the Squid log pattern. This is optional and is done for ease of use. Also, notice that we apply the UNWANTED tag for any part of the message that we don't want included in our resulting JSON structure. Finally, notice that we applied the naming convention to the IPV4 field by referencing the following list of field conventions.

  3. The last thing we need to do is to validate the Grok pattern to make sure it's valid. For our test we will be using a free Grok validator called Grok Constructor. A validated Grok expression should look like this:

  4. Now that the Grok pattern has been defined, we need to save it and move it to HDFS. Create a files called "squid" in the tmp directory and copy the Grok pattern into the file.
    touch /tmp/squid
    vi /tmp/squid
    //copy the grok pattern above to the squid file
  5. Now put the squid file into the directory where Metron stores its Grok parsers. Existing Grok parsers that ship with Metron are staged under /apps/metron/patterns/.
    su - hdfs
    hdfs dfs -put /tmp/squid /apps/metron/patterns/
    exit
    

Step 5: Create a Flux configuration for the new Squid Storm Parser Topology

  1. Now that the Grok pattern is staged in HDFS we need to define Storm Flux configuration for the Metron Parsing Topology. The configs are staged under /usr/metron/0.1BETA/config/topologies/ and each parsing topology has it's own set of configs. Each directory for a topology has a remote.yaml which is designed to be run on AWS and local/test.yaml designed to run locally on a single-node VM. Since we are going to be running locally on a VM we need to define a test.yaml for Squid. The easiest way to do this is to copy one of the existing Grok-based configs (YAF) and tailor it for Squid.
    mkdir /usr/metron/0.1BETA/flux/squid
    cp /usr/metron/0.1BETA/flux/yaf/remote.yaml /usr/metron/0.1BETA/flux/squid/remote.yaml
    vi /usr/metron/0.1BETA/flux/squid/remote.yaml
  2. And edit your config to look like this (replaced yaf with squid and replace the constructorArgs section ):
    name: "squid"
    config:
        topology.workers: 1
    components:
        -   id: "parser"
            className: "org.apache.metron.parsers.GrokParser"
            constructorArgs:
                - "/apps/metron/patterns/squid"
                - "SQUID_DELIMITED"
            configMethods:
                -   name: "withTimestampField"
                    args:
                        - "timestamp"
        -   id: "writer"
            className: "org.apache.metron.parsers.writer.KafkaWriter"
            constructorArgs:
                - "${kafka.broker}"
        -   id: "zkHosts"
            className: "storm.kafka.ZkHosts"
            constructorArgs:
                - "${kafka.zk}"
        -   id: "kafkaConfig"
            className: "storm.kafka.SpoutConfig"
            constructorArgs:
                # zookeeper hosts
                - ref: "zkHosts"
                # topic name
                - "squid"
                # zk root
                - ""
                # id
                - "squid"
            properties:
                -   name: "ignoreZkOffsets"
                    value: true
                -   name: "startOffsetTime"
                    value: -1
                -   name: "socketTimeoutMs"
                    value: 1000000
    spouts:
        -   id: "kafkaSpout"
            className: "storm.kafka.KafkaSpout"
            constructorArgs:
                - ref: "kafkaConfig"
    bolts:
        -   id: "parserBolt"
            className: "org.apache.metron.parsers.bolt.ParserBolt"
            constructorArgs:
                - "${kafka.zk}"
                - "squid"
                - ref: "parser"
                - ref: "writer"
    streams:
        -   name: "spout -> bolt"
            from: "kafkaSpout"
            to: "parserBolt"
            grouping:
                type: SHUFFLE
    

Step 6: Deploy the new Parser Topology

Now that we have the Squid parser topology defined, lets deploy it to our cluster.
  1. Deploy the new squid paser topology:
    sudo storm jar /usr/metron/0.1BETA/lib/metron-parsers-0.1BETA.jar org.apache.storm.flux.Flux --filter /usr/metron/0.1BETA/config/elasticsearch.properties --remote /usr/metron/0.1BETA/flux/squid/remote.yaml
  2. Go to the Storm UI and you should now see new "squid" topology and ensure that the topology has no errors
This squid processor topology will ingest from the squid Kafka topic we created earlier and then parse the event with Metron's Grok framework using the grok pattern we defined earlier. The result of the parsing is a standard JSON Metron structure that then gets put on the "enrichment" Kafka topic for further processing.
But how does the squid events in the access.log get put into the "squid" Kafka topic such at the Parser topology can parse it?  We will do that using Apache Nifi.

Using Apache Nifi to Stream data into Metron

Put simply NiFi was built to automate the flow of data between systems. Hence it is a fantastic tool to collect, ingest and push data to Metron. The below instructions on how to install configure and create the nifi flow to push squid events into Metron.

Install, Configure and and Start Apache Nifi

The following shows how to install Nifi on the VM. Do the following as root:

  1. Download Nifi
    cd /usr/lib
    wget  http://public-repo-1.hortonworks.com/HDF/centos6/1.x/updates/1.2.0.0/HDF-1.2.0.0-91.tar.gz
    tar -zxvf HDF-1.2.0.0-91.tar.gz 
  2. Edit Nifi Configuration to update the port of the nifi web app: nifi.web.http.port=8089
    cd HDF-1.2.0.0/nifi
    vi  conf/nifi.properties
    //update nifi.web.http.port to 8089
  3. Install Nifi as service
    bin/nifi.sh install nifi
  4. Start the Nifi Service
    service nifi start
  5. Go to the Nifi Web: http://node1:8089/nifi/

Create a Nifi Flow to stream events to Metron

Now we will create a flow to capture events from squid and push them into metron

  1. Drag a processor to the canvas (do this by the dragging the processor icon..first icon)
  2. Search for TailFile processor and select Add. Right click on the processor and configure. In settings tab change the name to "Ingest Squid Events"
    1. In properties, configure the following like the following:
  3. Drag Another Processor the canvas
  4. Search for PutKafka and select Add
  5. Right click on the processor and configure. In Settings, change names to "Stream to Metron” click the checkbox for failure and success for relationship.
  6. Under properties, set 3 properties
    1. Known Brokers: node1:6667
    2. Topic Name: squid
    3. Client Name: nifi-squid
  7. Create a connection by dragging the arrow from Ingest Squid Events to Stream to Metron
  8. Select the entire Flow and click the play button (play button). you should see all processors green like the below:
  9. Generate some data using squidclient (do this for about 20+ sites)
    squidclient http://www.cnn.com
  10. You should see metrics on the processor of data being pushed into Metron.
  11. Look at the Storm UI for the parser topology and you should see tuples coming in
  12. After about 5 minutes, you should see a new Elastic Search index called squid_index* in the Elastic Admin UI

Verify Events are Indexed

By convention the index where the new messages will be indexed is called squid_index_[timestamp] and the document type is squid_doc.

In order to verify that the messages were indexed correctly, we can use the elastic search Head plugin.

  1. Install the head plugin
    usr/share/elasticsearch/bin/plugin -install mobz/elasticsearch-head/1.x 
  2. Navigate to elastic head UI: http://node1:9200/_plugin/head/
  3. Click on Browser tab and select squid doc on the left panel and then select one of th sample docs. You should see something like the following:

Configure Metron UI to view the Squid Telemetry Events

Now that we have Metron configured to parse, index and persist telemetry events and Nifi pushing data to Metron, lets now visualize this streaming telemetry data in the Metron UI.

  1. Go to the Metron UI.
  2. Add a New Pinned query
    1. Click the + to add new pinned query
    2. Create a query: _type: squid_doc
    3. Click the colored circle icon, name the saved query and click Pin. See below
  3. Add a new histogram panel for the Squid events
    1. Click the add add panel + icon
    2. Select histogram panel type
    3. Set title as “Squid Events”
    4. Change Time Field to: timestamp
    5. Configure span to 12
    6. In the queries dropdown select “Selected” and only select the “Squid Events” pinned query
    7. Click Save and should see data in the histogram
  • You should now see the new Squid events
  • No labels