Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

JIRA : 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyEAGLE-81

 

Notification Plugin should provide interface which accepts the Eagle Alert Entity and return Status of the same.

Architecture:

Gliffy Diagram
nameNotification Plugin

Today in Eagle there is no Notification Plugin Framework where users can implement the Notification Plugin Interface and upload their own library. This Plugin allows user to implement their own and also provides below default implementation 

...

Code Block
languagejava
themeConfluence
titlePolicy Create
collapsetrue
Policy With Email Notification:
curl  -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"notificationType":"Email Notification","site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"viewPrivatesensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Email Notification\",\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]","remediationDef":"","enabled":true}]'
 
 
Policy With Kafka Topic:
curl  -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"notificationType":"Kafka Store","site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"DoNotAccesssensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/datatmp/ssnprivate\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Kafka Store\" , \"kafkaTopic\":\"notification_topic_kafkaalerts\"} ]","remediationDef":"","enabled":true}]'
 
 
Policy With Eagle Store:
curl  -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"notificationType":"Eagle Store","site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"AccessProductssensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/data/products/customer_productstmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","enabled":true}]'
 

 

Config: 

 For creating Kafka Producer we need Kafka  broker.

"eagleNotificationProps" : {
"kafka_broker":"localhost:9092"
}

 

For now , We Support only one Kafka Cluster where eagle reads from application.conf.

Alert Notification Executor:

     Once the AlertAPIEntity generated from Policy Evaluation ,  AlertNotifcation Executor picks it up and send it to Configured Notification Plugin .

Code Block
languagejava
themeConfluence
titleAlertNotification Executor
collapsetrue
 @Override
   public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
       String policyId = (String) input.get(0);
       AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
       processAlerts(policyId, Arrays.asList(alertEntity));
   }

private void processAlerts(String policyId, List<AlertAPIEntity> list) {
   for (AlertAPIEntity entity : list) {
      notificationManager.notifyAlert(entity);
   }
}

@Override
public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
   if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
   for(AlertDefinitionAPIEntity alertDef : added.values()){
      LOG.info("alert notification config really changed " + alertDef);
      notificationManager.updateNotificationPlugins( alertDef );
   }
}

 

"notificationDef": "[{\"notificationType\":\"Eagle Store\"}]","remediationDef":"","enabled":true}]'
 
Mulitiple Plugins:
 
curl   -u -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Esclate To ExternalSys\"},{\"notificationType\":\"Eagle Store\"},{\"notificationType\":\"Kafka Store\"
 , \"kafkaTopic\" : \"alerts\"}]","remediationDef":"","enabled":true}]'
 

 

 

Querying Available Notifications :

Code Block
languagejava
titleGetAllNotifications
collapsetrue
http://localhost:8080/eagle-service/rest/entities?query=AlertNotificationService%5B%5D%7B*%7D&pageSize=2147483647&startTime=1970-01-01%2000:00:00&endTime=1970-01-11%2000:00:00&treeAgg=false

 {"meta":{"elapsedms":77,"totalResults":4,"lastTimestamp":86400000,"firstTimestamp":86400000},"success":true,
"obj":[
{"prefix":"alertNotifications","tags":{"notificationType":"Eagle Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exUY4H4U","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Forward to External System"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exbzI93E","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Email Notification"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exe4g2y8","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Kafka Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exfPVxyc","enabled":true}
],
"type":"org.apache.eagle.alert.entity.AlertNotificationEntity"}
 

 

Config: 

 For creating Kafka Producer we need Kafka  broker.

"eagleNotificationProps" : {
"kafka_broker":"localhost:9092"
}

 

For now , We Support only one Kafka Cluster where eagle reads from application.conf.

Alert Notification Executor:

     Once the AlertAPIEntity generated from Policy Evaluation ,  AlertNotifcation Executor picks it up and send it to Configured Notification Plugin .

Code Block
languagejava
themeConfluence
titleAlertNotification Executor
collapsetrue
 @Override
   public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
       String policyId = (String) input.get(0);
       AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
       processAlerts(policyId, Arrays.asList(alertEntity));
   }

private void processAlerts(String policyId, List<AlertAPIEntity> list) {
   for (AlertAPIEntity entity : list) {
      notificationManager.notifyAlert(entity);
   }
}

@Override
public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
   if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
   for(AlertDefinitionAPIEntity alertDef : added.values()){
      LOG.info("alert notification config really changed " + alertDef);
      notificationManager.updateNotificationPlugins( alertDef );
   }
}

 

In future , we will decouple this Notification Executor later once we finalize the design for Generic Notification ( which accepts alerts from any System ).

 

How to Use Eagle Notification Plugin for Custom Implementation ?


Users should implement NotificationPlugin and write own logic .

 

Pls read Configuration from Config Object of Eagle .

For Example :

"notificationDef": "{\"kafkaTopic\":\"notification_topic_kafka\"}" 

 In code you need to get  AlertDefinitionEntity.getNotificationDef()  will return you  the above JSON Str.. Once you get the config you can Parse it for your use.

 

Or you can simply forward it to external system .

Code Block
languagejava
themeConfluence
titleCustomImplementation
collapsetrue
 
public CustomNotificationSink implements NotificationPlugin {
 
 // Do your Plugin Initialization like Connecting to External System etc...
 public void _init() throws Exception {
 }
 
 // Eagle Notification Manager forwards AlertAPIEntity to this onAlert method 
 public void onAlert() {
  // Where to forward this alerts logic should be here ?
 }
 
 public NotificationStatus getStatus(){
  // return the status of notification 
 }
}

 

Development Steps:

To make use of Eagle Notification Plugin , below are the different steps to be followed.

 1) Copy eagle jars into your library 

2) Add  eagle jars as dependency in pom.xml

3) Implement Notification Plugin Interface

4) Write your custom logic In future , we will decouple this Notification Executor later once we finalize the design for Generic Notification ( which accepts alerts from any System ).