Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

JIRA : 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyEAGLE-81

 

Notification Plugin should provide interface which accepts the Eagle Alert Entity and return Status of the same.

Architecture:

Gliffy Diagram
nameNotification Plugin

Today in Eagle there is no Notification Plugin Framework where users can implement the Notification Plugin Interface and upload their own library. This Plugin allows user to implement their own and also provides below default implementation 

...

Code Block
languagejava
themeConfluence
titleNotification Interface
linenumberstrue
collapsetrue
package org.apache.eagle.notification;

import org.apache.eagle.alert.entity.AlertAPIEntity;

/**
 * Notification Plugin Interface 
 Plug-in interface which provide abstraction layer to notify different system 
 */
public interface NotificationPlugin {

   /**
    * for initialization
    * @throws Exception
     */
   public void _init() throws  Exception;
   /**
     * Post a notification for the given alertalertEntity entity
     * @param alertEntity
     */
    public void onAlert( AlertEntityAlertAPIEntity alertEntity );
 throws  NotificationException;
    /**
    *  Returns Status of Notification Post 
    * @return
    */
	   public NotificationStatus  getStatus();
}

 

At the time of Eagle Topology starts ,  Code should scan and register the Notification Type ( Custom Notification / Email / Kafka Message  etc.. ).  Since Eagle allows users to implement their own Notification Implementation  , Our Topology Init code should automatically detects and register which type of Notification needs to be used .

...

Code Block
languagejava
themeConfluence
titlePersistAlertToKafka
collapsetrue
package org.apache.eagle.notification;
import org.apache.eagle.alert.entity.AlertAPIEntity;
import org.apache.eagle.common.config.EagleConfig;
import org.apache.eagle.common.config.EagleConfigFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class PersistAlertToKafkaTopic  implements NotificationPlugin {
	private boolean isNotified;	
	private static EagleConfig config = EagleConfigFactory.load();
	
	@Override
	@Resource(name = "Kafka Store" , description = "Persist Alert Entity to Kafka")
@SuppressWarnings({ "rawtypes", "unchecked" })
public class PersistAlertToKafkaTopic  implements NotificationPlugin {

   private NotificationStatus status;

   private static final Logger LOG = LoggerFactory.getLogger(PersistAlertToKafkaTopic.class);
   private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
   private Map<String, KafkaTopicConfig> kafaConfigs = new ConcurrentHashMap<String, KafkaTopicConfig>();
   private Config config;
   private AlertDefinitionDAO alertDefinitionDao;

   @Override
   public void _init() throws  Exception  {
      config = EagleConfigFactory.load().getConfig();
      String site = config.getString("eagleProps.site");
      String dataSource = config.getString("eagleProps.dataSource");
      activeAlerts.clear();
      kafaConfigs.clear();
      alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"),  config.getInt("eagleProps.eagleService.port")));
      try{
         activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Kafka Store");
      }catch (Exception ex ){
         LOG.error(ex.getMessage());
         throw  new Exception(" Kafka Store  Cannot be initialized . Reason : "+ex.getMessage());
      }
      Set<String> policies = activeAlerts.keySet();
      for( String policyId : policies )
      {
         Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
         KafkaTopicConfig kafkaConfig = new KafkaTopicConfig();
         try {
            kafkaConfig = JsonSerDeserUtils.deserialize(activeAlerts.get(policyId).getNotificationDef(), KafkaTopicConfig.class, Arrays.asList(module));
            this.kafaConfigs.put(policyId,kafkaConfig);
         }catch (Exception ex){
            LOG.error(" Exception when initializing PersistAlertToKafkaTopic. Reason : "+ex.getMessage());
         }
      }
   }

   @Override
   public void onAlert(AlertAPIEntity alertEntity) {
      try{
		
          status = new NotificationStatus();
         processAlertEntity(alertEntity);
		isNotified = true;
	}
		
	         status.setNotificationSuccess(true);
      }catch(Exception ex ){
         LOG.error(" Exception when Posting Alert Entity to Kafaka Topic. Reason : "+ex.getMessage());
         status.setMessage(ex.getMessage());
      }     
   }

   /**
    * Access KafkaProducer and send entity to Bus 
    * @param alertEntity
    * @throws Exception
    */
   public void processAlertEntity( AlertAPIEntity alertEntity ) throws Exception {
		      KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer();
		      producer.send(createRecord(alertEntity));
		
	}
	
	     
   }
   
   /**
    * To Create  KafkaProducer Record 
    * @param entity
    * @return
    * @throws Exception
    */
   public ProducerRecord  createRecord(AlertAPIEntity entity ){
		ProducerRecord  throws Exception {
      String policyId = entity.getTags().get(AlertConstants.POLICY_ID);
      ProducerRecord  record  = new ProducerRecord( this.kafaConfigs.get(policyId).getKafkaTopic(), entity.toString());
      return record;
   }  
   
   @Override
   public NotificationStatus getStatus() {       
      return status;
   }

 

Email Alert Notification:

      Email Notification  allows us to send email alert/ sms to the configured email id. 

Code Block
languagejava
themeConfluence
titleEmailNotification
collapsetrue
@Resource(name = "Email Notification" , description = "  Email Notification API to trigger email/sms ")
public class EmailNotification  implements NotificationPlugin {

   private static final Logger LOG = LoggerFactory.getLogger(EmailNotification.class);
   private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
   static Map<String, List<EmailGenerator>> emailGenerators = new ConcurrentHashMap<String, List<EmailGenerator>>();
   private Config config;
   private AlertDefinitionDAO alertDefinitionDao;
   private NotificationStatus status ;

   /* initialize Email Notification related Objects Properly */

   public void _init() throws  Exception {
      // Get Config Object
      config = EagleConfigFactory.load().getConfig().getString(NotificationConstants.NOTIFICATION_ALERT_KAFKA_TOPIC), entity);
		return record;
	}
	
	@Override
	public NotificationStatus getStatus() {
		NotificationStatus;
      String site = config.getString("eagleProps.site");
      String dataSource = config.getString("eagleProps.dataSource");
      activeAlerts.clear();
      emailGenerators.clear();
      // find out all policies and its notification Config
      alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
      try{
         activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Email Notification");
      }catch (Exception ex ){
         LOG.error(ex.getMessage());
         throw  new Exception(" Email Notification Cannot be initialized . Reason : "+ex.getMessage());
      }
      // Create Email
      Set<String> policies = activeAlerts.keySet();
      for( String policyId : policies ){
         AlertDefinitionAPIEntity alertDef = activeAlerts.get(policyId);
         List<EmailGenerator> tmpList = createEmailGenerator(alertDef);
         this.emailGenerators.put(policyId , tmpList);
      }
   }

   @Override
   public void onAlert(AlertAPIEntity alertEntity) {
      status = new NotificationStatus();
      String policyId = alertEntity.getTags().get(AlertConstants.POLICY_ID);
      System.out.println(" Email Notification ");
      List<EmailGenerator> generatorList = this.emailGenerators.get(policyId);
      boolean isSuccess = false;
      for( EmailGenerator gen : generatorList ) {
         isSuccess = gen.sendAlertEmail(alertEntity);
         if( !isSuccess ) {
            status.setMessage(" Failed to send email ");
            status.setNotificationSuccess(false);
         }else
            status.setNotificationSuccess(true);
      }
   }
}

PersistAlertToEagle Service:

 If user selects this Notification Plugin , all alerts will be persisted into Eagle Store.

 

Code Block
languagejava
themeConfluence
titleEalge Store
collapsetrue
/**
 * Responsible to persist Alerts to Eagle Storage
 */
@Resource(name = "Eagle Store" , description = "Persist Alert Entity to Eagle Store")
public class PersistToEagleStore implements  NotificationPlugin {

    private static final Logger LOG = LoggerFactory.getLogger(PersistToEagleStore.class);
    private NotificationStatus status;

    private Config config;
    private EagleAlertPersist persist;

    @Override
    public void _init() throws Exception {
        config = EagleConfigFactory.load().getConfig();
        this.persist = new EagleAlertPersist(config.getString("eagleProps.eagleService.host"),  config.getInt("eagleProps.eagleService.port"),
                                             config.getString("eagleProps.eagleService.username"), config.getString("eagleProps.eagleService.password"));
    }

    @Override
    public NotificationStatus getStatus() {
        return this.status;
    }

    @Override
    public void onAlert(AlertAPIEntity alertEntity) {
        try{
            status = new NotificationStatus();
		status.setNotificationSuccess(isNotified);
		return status;
	}
}            List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
            list.add(alertEntity);
            persist.doPersist( list );
            status.setNotificationSuccess(true);
        }catch (Exception ex ){
            status.setMessage(ex.getMessage());
            LOG.error(" Exception when Posting Alert Entity to Eagle Service Topic. Reason : "+ex.getMessage());
        }
    }
}

Notification Manager :

    Notification Manager responsible for  

                1) Scanning Notification Plugins 

                2) Loading all Plugins/ Persisting into alertNotifications  Table 

                3) Broadcasting any updates in Policy to all Notification Plugins 

 

Code Block
languagejava
themeConfluence
titleNotificationManager
collapsetrue
/**
 * Notification manager that is responsible for
 * <p> Scanning Plugins </p>
 * <p> Loading Plugins and Policy Mapping </p>
 * <p> Initializing Plugins </p>
 * <p> Forwarding eagle alert to configured Notification Plugin </p>
 * <p> BroadCast Changes in Policy to all Notification Plugins  </p>
 */
public class NotificationManager  {

    public static Map<String, String > policyNotificationMapping = new ConcurrentHashMap<String,String>();
    private static final Logger LOG = LoggerFactory.getLogger(NotificationManager.class);

    /**
     * Static Initializer of Manager
     */
    static {
        policyNotificationMapping.clear();
        // initialize all Notification Plugins
        _init();
    }

    /**
     * Initialization of Notification Manager
     */
    private static void _init() {
        policyNotificationMapping.clear();
        Set<String> plugins = NotificationPluginLoader.notificationMapping.keySet();
        for( String plugin : plugins ){
            try {
                Object obj =  NotificationPluginLoader.notificationMapping.get(plugin);
                obj.getClass().getMethod("_init").invoke(obj); // invoke _init method of all notification plugins
            } catch (Exception e) {
                LOG.error(" Error in loading Plugins . Reason : "+e.getMessage());
            }
        }
        Config config = EagleConfigFactory.load().getConfig();
        String site = config.getString("eagleProps.site");
        String dataSource = config.getString("eagleProps.dataSource");
        // find notification Types
        AlertDefinitionDAO alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
        try{
            List<AlertDefinitionAPIEntity> activeAlerts = alertDefinitionDao.findActiveAlertDefs( site , dataSource );
            for( AlertDefinitionAPIEntity entity : activeAlerts ){
                policyNotificationMapping.put(entity.getTags().get(AlertConstants.POLICY_ID) , entity.getTags().get(AlertConstants.NOTIFICATION_TYPE));
            }
        }catch (Exception ex ){
            LOG.error(" Error in determining policy and its notification type. Reason : "+ex.getMessage());
        }
    }

    /**
     * To Pass Alert to respective Notification Plugins
     * @param entity
     */
    public void notifyAlert( AlertAPIEntity entity ) {
        try {
            Object obj  = getNotificationPluginAPI( this.policyNotificationMapping.get(entity.getTags().get(AlertConstants.POLICY_ID)) );
            obj.getClass().getMethod("onAlert" , new Class[]{AlertAPIEntity.class}).invoke( obj , entity);
        } catch ( Exception ex) {
            LOG.error(" Error in NotificationManager when invoking NotifyAlert method  . Reason : "+ex.getMessage());
        }
    }

    /**
     * Returns Notification Plugin for the given Type
     * @param type
     * @return
     */
    private Object getNotificationPluginAPI( String type ){
        return NotificationPluginLoader.notificationMapping.get(type);
    }

    /**
     * Update all Notification Plugin if changes in Policy
     * @param entity
     */
    public void updateNotificationPlugins( AlertDefinitionAPIEntity entity ){
        try {
            // Re Load the plugins
            // Re visit this , this should be in better way
           NotificationPluginLoader.loadPlugins();
            // Re initialize Notification Manager
            _init();
        } catch (Exception e) {
            LOG.error(" Error in updateNotificationPlugins  . Reason : "+e.getMessage());
        }
    }

 

How to Create Policy with Notification Type ?:


When we create Policy , we need to select what is the Notification Type for the Policy.

Example : 

Code Block
languagejava
themeConfluence
titlePolicy Create
collapsetrue
Policy With Email Notification:
curl  -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Email Notification\",\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}],"remediationDef":"","enabled":true}]'
 
 
Policy With Kafka Topic:
curl  -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Kafka Store\" , \"kafkaTopic\":\"alerts\"} ]","remediationDef":"","enabled":true}]'
 
 
Policy With Eagle Store:
curl  -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Eagle Store\"}]","remediationDef":"","enabled":true}]'
 
Mulitiple Plugins:
 
curl   -u -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Esclate To ExternalSys\"},{\"notificationType\":\"Eagle Store\"},{\"notificationType\":\"Kafka Store\"
 , \"kafkaTopic\" : \"alerts\"}]","remediationDef":"","enabled":true}]'
 

 

 

Querying Available Notifications :

Code Block
languagejava
titleGetAllNotifications
collapsetrue
http://localhost:8080/eagle-service/rest/entities?query=AlertNotificationService%5B%5D%7B*%7D&pageSize=2147483647&startTime=1970-01-01%2000:00:00&endTime=1970-01-11%2000:00:00&treeAgg=false

 {"meta":{"elapsedms":77,"totalResults":4,"lastTimestamp":86400000,"firstTimestamp":86400000},"success":true,
"obj":[
{"prefix":"alertNotifications","tags":{"notificationType":"Eagle Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exUY4H4U","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Forward to External System"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exbzI93E","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Email Notification"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exe4g2y8","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Kafka Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exfPVxyc","enabled":true}
],
"type":"org.apache.eagle.alert.entity.AlertNotificationEntity"}
 

 

Config: 

 For creating Kafka Producer we need Kafka  broker.

"eagleNotificationProps" : {
"kafka_broker":"localhost:9092"
}

 

For now , We Support only one Kafka Cluster where eagle reads from application.conf.

Alert Notification Executor:

     Once the AlertAPIEntity generated from Policy Evaluation ,  AlertNotifcation Executor picks it up and send it to Configured Notification Plugin .

Code Block
languagejava
themeConfluence
titleAlertNotification Executor
collapsetrue
 @Override
   public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
       String policyId = (String) input.get(0);
       AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
       processAlerts(policyId, Arrays.asList(alertEntity));
   }

private void processAlerts(String policyId, List<AlertAPIEntity> list) {
   for (AlertAPIEntity entity : list) {
      notificationManager.notifyAlert(entity);
   }
}

@Override
public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
   if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
   for(AlertDefinitionAPIEntity alertDef : added.values()){
      LOG.info("alert notification config really changed " + alertDef);
      notificationManager.updateNotificationPlugins( alertDef );
   }
}

 

In future , we will decouple this Notification Executor later once we finalize the design for Generic Notification ( which accepts alerts from any System ).

 

How to Use Eagle Notification Plugin for Custom Implementation ?


Users should implement NotificationPlugin and write own logic .

 

Pls read Configuration from Config Object of Eagle .

For Example :

"notificationDef": "{\"kafkaTopic\":\"notification_topic_kafka\"}" 

 In code you need to get  AlertDefinitionEntity.getNotificationDef()  will return you  the above JSON Str.. Once you get the config you can Parse it for your use.

 

Or you can simply forward it to external system .

Code Block
languagejava
themeConfluence
titleCustomImplementation
collapsetrue
 
public CustomNotificationSink implements NotificationPlugin {
 
 // Do your Plugin Initialization like Connecting to External System etc...
 public void _init() throws Exception {
 }
 
 // Eagle Notification Manager forwards AlertAPIEntity to this onAlert method 
 public void onAlert() {
  // Where to forward this alerts logic should be here ?
 }
 
 public NotificationStatus getStatus(){
  // return the status of notification 
 }
}

 

Development Steps:

To make use of Eagle Notification Plugin , below are the different steps to be followed.

 1) Copy eagle jars into your library 

2) Add  eagle jars as dependency in pom.xml

3) Implement Notification Plugin Interface

4) Write your custom logic