Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

In the previous section, Application of Threat Intel Fields, we walked through how to load threat intel data into Metron and then apply those threat intels in realtime as telemetry events are streamed through the platform. 

...

Now that we know what we should do, the next question is how to accomplish it; in other words, we must define what exactly we mean when we say "severity."  The capability, as implemented in Metron, is accomplished by providing the ability to associate possibly complex conditions to numeric scores.  ThenThen, for each message, the set of conditions are evaluated and the set of numbers for matching conditions are aggregated via a configurable aggregation function.  This aggregated score is added to the message in thethe threat.triage.level.  Let's dig a bit deeper into this and provide an example.

...

The heart of the problem is how one defines we define a "condition."  In Metron, we provide a custom domain specific language for defining conditions.  

...

  • Referencing fields in the enriched JSON
  • Simple boolean operations: 
    • and, &&
    • not
    • or, ||
  • Determining whether a field exists (via exists)
  • The ability to have parenthesis to make order of operations explicit
  • A fixed set of functions which take strings and return boolean. Currently:
    • IN_SUBNET(ip, cidr1, cidr2, ...)
    • IS_EMPTY(str)
    • STARTS_WITH(str, prefix)
    • ENDS_WITH(str, suffix)
    • REGEXP_MATCH(str, pattern)
  • A fixed set of string-to-string transformation functions.  Currently:
    • TO_LOWER
    • TO_UPPER
    • TRIM

...

  • The value of the src_ip_addr field is in the 192.168.0.0/24 subnet
  • The value of the src_ip_addr field is 10.0.0.1 or 10.0.0.2
  • The field is_local exists

 

More information can be found here: Metron Query Language

Threat Triage Configuration Explained

Now that we have the ability to define conditions, for each sensor we need to associate these conditions to scores.  Since this is a per-sensor configuration, this fits nicely within the sensor enrichment configurationheld  held in zookeeperZookeeper.  This configuration fits well within the threatIntel section of the configuration like so:

{
  ...
  ,"threatIntel" : {
            ...
           , "triageConfig" : {
                     "riskLevelRules" : [
{
                                 “name” : “  "condition1" : level1
                               , "condition2" : level2
                                  ...
                                      "
“comment” : “ "
“rule”: " ”
“score” :
  }
                     ,"aggregator" : "MAX"
,"aggregationConfig" : { } 
                             }
                  }
}

...

  • riskLevelRules correspond to the set of condition-to-numeric-level mappings that define the threat triage for this particular sensor. 
    • name: The name of the threat triage rule.

    • comment: A comment describing the rule.
    • rule: The rule, represented as a Stellar statement.
    • score: Associated threat triage score for the rule.
  • aggregator is an aggregation function that takes all non-zero scores representing the matching queries from riskLevelRules and aggregates them into a single score.  The current supported aggregation functions are the following:
    • MAX : The max of all of the associated values for matching queries.
    • MIN : The min of all of the associated values for matching queries.
    • MEAN : The mean of all of the associated values for matching queries.
    • POSITIVE_MEAN : The mean of the positive associated values for the matching queries.

Step 1: Setup and

...

Prerequisites

  1. Complete You should have completed the instructions in Adding a new Telemetry Data Source.
  2. Make sure the following variables are configured based on your environment: 

     

    • KAFKA_HOST = The host where a Kafka broker is installed.
    • ZOOKEEPER_HOST = The host where a Zookeeper server is installed.
    • PROBE_HOST =
    Host
    • The host where your sensor, probes are installed. If don't have any sensors installed, pick the host where a
    storm
    • Storm supervisor is running.
    • SQUID_HOST =
    Host
    • The host where you want to install SQUID. If you don't care, just install SQUID on the PROBE_HOST.
    • NIFI_HOST =
    The host
    • Host where you will install NIFI. You want this this to be same host
    that
    • on which you installed Squid.
    • HOST_WITH_ENRICHMENT_TAG =
    This is the
    • The host in your inventory hosts file that you put under the group "enrichment.
    • SEARCH_
    HOST
    • HOST =
    This is the
    • The host where you have
    elastic
    • Elastic or
    solr
    • Solr running.
    This
    •  This is the host in your inventory hosts file that you put under the group "search". Pick one of the search hosts.
    • SEARCH_HOST_PORT  = The port of the search host where indexing is configured. (e.g
    :
    • ., 9300)
    • METRON_UI_HOST =
    This is the
    • The host where your
    metron ui
    • Metron UI web application is running.
    This
    •  This is the host in your inventory hosts file that you put under the group "web."
    .
    • METRON_VERSION = The release of the
    metron
    • Metron binaries you are working with. (e.g
    :
    • ., 0.2.0BETA-RC2)

Step 2: Create the Threat Triage Rule Configuration

So, where In the previous article, Application of Threat Intel Feed, we left off in part 4 was with a working threat intelligence enrichment.  Now, let's see if we can triage those threats for the squid Squid data flowing through.  In In particular, let's triage the threat alerts for the squidsensor data higher data that are higher under the following conditions:

  • Rule 1: If the threat intel enrichment type zeusList as defined in part 4 the previous article is alerted, then we want to consider that an alert of score of 5.
  • Rule 2: If the url is neither a .com nor a .net, then we want to consider that an alert a score of 10.
  • Rule 3: For each message, the triage score is the maximum score across all conditions.

For each message we will assign the maximum score across all conditions as the triage score.  This translates into the following configuration:

{
  ...
  ,"threatIntel" : {
            ...

 

Image Added

Step 3: Upload the Threat Triage Configuration to Zookeeper

In order to apply this triage configuration, we must modify the configuration for the squid sensor in the enrichment topology.  

  1. Log into $HOST_WITH_ENRICHMENT_TAG as root.
  2. We need to modify /usr/metron/$METRON_RELEASE/config/zookeeper/sensors/squid.json. However, since the configuration in Zookeeper may be out of sync with the configuration on disk, we must make sure they are in sync by downloading the Zookeeper configuration first:

    /usr/metron/$METRON_RELEASE/bin/zk_load_configs.sh -m PULL -z $ZOOKEEPER_HOST:2181 -f -o /usr/metron/$METRON_RELEASE/config/zookeeper
  3. Validate that the enrichment config for Squid exists.

    cat /usr/metron/$METRON_RELEASE/config/zookeeper/enrichments/squid.json
  4. Edit the configuration.  In /usr/metron/$METRON_RELEASE/config/zookeeper/enrichments/squid.json add the following to the triageConfig section to the threat intel section.
    "threatIntel" : {
        "fieldMap" : 

...

  1. {
          

...

  1. "hbaseThreatIntel" : [ "domain_without_subdomains" ]
        },
        

...

  1. "

...

  1. fieldToTypeMap" :

...

  1.  {
          "domain_without_subdomains" : [ "zeusList" ]
        },
        "config" : { },
        "triageConfig" : {
          "riskLevelRules" 

...

  1. : {
             "exists(threatintels.hbaseThreatIntel.

...

  1. domain_without_subdomains.zeusList)" : 5

...

  1.               

...

  1. , "not(ENDS_WITH(

...

  1. domain_without_subdomains, '.com') or ENDS_WITH(

...

  1. domain_without_subdomains, '.net'))" : 10

...

  1.                            }

...

  1.         

...

  1. ,"aggregator" : "MAX"
            ,"aggregationConfig" : { }
                          }
                      

...

  1. }

...

}

...

...

  1.  

...

  1.  }
  2. Ensure

/usr/metron/0.1BETA/bin/zk_load_configs.sh -m PULL -z node1:2181 -f -o /usr/metron/0.1BETA/config/zookeeper

 We should ensure that the configuration for squid exists by checking out

 

TODO: the directory sensors is wrong. It shoudl be changed to enrichments. Also change field url to domain_without_subdomains

cat /usr/metron/0.1BETA/config/zookeeper/sensors/squid.json

Now we can edit the configuration.  In /usr/metron/0.1BETA/config/zookeeper/sensors/squid.json edit the section titled riskLevelRules and add the two rules above to the map:

  • "exists(threatintels.hbaseThreatIntel.url.zeusList)" : 5
  • "not(ENDS_WITH(url, '.com') or ENDS_WITH(url, '.net'))" : 10

...

  1. that the aggregator field indicates MAX.

  2. After modifying the configuration, we can push the configuration back to

...

  1. Zookeeper and have the enrichment topology pick it up with live data

...

  1. by running the following:

    /usr/metron/

...

  1. $METRON_RELEASE/bin/zk_load_configs.sh -m PUSH -z 

...

  1. $ZOOKEEPER_HOST:2181 -i /usr/metron/

...

Now, if we reload the data from the part 4 via

tail /var/log/squid/access.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic squid

...

  1. $METRON_RELEASE/config/zookeeper 

Step 4: View Triaged/Scored Alerts

...

Because alamman.com is a malicious host from the zeusList threat intel feed but is a .com address, it's assigned assigned a threat.triage.level of 5.

...

Because atmape.ru is both a malicious host from the zeusList threat intel feed as well as a non .com and non .net address, it's assigned assigned a threat.triage.level of 10.


Metron UI Triaged Alerts Panel

Image Added