Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This should ingest our Squid logs into Kafka.  Now we are ready to tackle the Metron parsing topology setup.  The first thing we need to do is decide if we will be using the Java-based parser of a Grok-based parser for the new telemetry.  In this example we will be using the Grok parser.  Grok parser is perfect for structured or semi-structured logs that are well understood (check) and telemetries with lower volumes of traffic (check).  The first thing we need to do is define the Grok expression for our log.  Refer to Grok documentation for additional details.  In our case the pattern is:

WDOM [^(?:http:\/\/|www\.|https:\/\/)]([^\/]+)

SQUID_

DELIMITED %

DELIMITED %{NUMBER:timestamp} %{SPACE:UNWANTED}

 %

%{INT:elapsed} %{IPV4:ip_src_addr} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} http:\/\/\www.%{WDOM:url}\/ - %{WORD:

UNWATED

UNWANTED}\/%{IPV4:ip_dst_addr} %{WORD:

UNWATED

UNWANTED}\/%{WORD:

UNWATED

UNWANTED}

Notice that I define a WDOM pattern (that is more tailored to Squid instead of using the generic Grok URL pattern) before defining the Squid log pattern.  This is optional and is done for ease of use.  Also, notice that I apply the UNWANTED tag for any part of the message that I don't want included in my resulting JSON structure.  Finally, notice that I applied the naming convention to the IPV4 field by referencing the following list of field conventions.  The last thing I need to do is to validate my Grok pattern to make sure it's valid. For our test we will be using a free Grok validator called Grok Constructor.  A validated Grok expression should look like this:

...

name: "squid"

config:

    topology.workers: 1

 

 

components:

    -   id: "parser"

        className: "org.apache.metron.parsing.parsers.GrokParser"

        constructorArgs:

            - "/apps/metron/patterns/squid"

            - "SQUID_DELIMITED"

        configMethods:

            -   name: "withTimestampField"

                args:

                    - "start_time" 

            -   name: "withMetronHDFSHome"

                args:

                    - "timestamp"

    -   id: "writer"

        className: "org.apache.metron.writer.KafkaWriter"

        constructorArgs:

            - "${kafka.broker}"

    -   id: "zkHosts"

        className: "storm.kafka.ZkHosts"

        constructorArgs:

            - "${kafka.zk}"

    -   id: "kafkaConfig"

        className: "storm.kafka.SpoutConfig"

        constructorArgs:

            # zookeeper hosts

            - ref: "zkHosts"

            # topic name

            - "squid"

            # zk root

            - ""

            # id

            - "squid"

        properties:

            -   name: "ignoreZkOffsets"

                value: false

            -   name: "startOffsetTime"

                value: -1

            -   name: "socketTimeoutMs"

                value: 1000000

 

spouts:

    -   id: "kafkaSpout"

        className: "storm.kafka.KafkaSpout"

        constructorArgs:

            - ref: "kafkaConfig"

 

bolts:

    -   id: "parserBolt"

        className: "org.apache.metron.bolt.ParserBolt"

        constructorArgs:

            - "${kafka.zk}"

            - "squid"

            - ref: "parser"

            - ref: "writer"

 

streams:

    -   name: "spout -> bolt"

        from: "kafkaSpout"

        to: "parserBolt"

        grouping:

            type: SHUFFLE

...