JIRA :
Jira | ||||||
---|---|---|---|---|---|---|
|
Notification Plugin should provide interface which accepts the Eagle Alert Entity and return Status of the same.
Architecture:
Gliffy Diagram | ||||
---|---|---|---|---|
|
Today in Eagle there is no Notification Plugin Framework where users can implement the Notification Plugin Interface and upload their own library. This Plugin allows user to implement their own and also provides below default implementation
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
@Resource(name = "Email Notification" , description = " Email Notification API to trigger email/sms ") public class EmailNotification implements NotificationPlugin { private static final Logger LOG = LoggerFactory.getLogger(EmailNotification.class); private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>(); static Map<String, List<EmailGenerator>> emailGenerators = new ConcurrentHashMap<String, List<EmailGenerator>>(); private Config config; private AlertDefinitionDAO alertDefinitionDao; private NotificationStatus status ; /* initialize Email Notification related Objects Properly */ public void _init() throws Exception { // Get Config Object config = EagleConfigFactory.load().getConfig(); String site = config.getString("eagleProps.site"); String dataSource = config.getString("eagleProps.dataSource"); activeAlerts.clear(); emailGenerators.clear(); // find out all policies and its notification Config alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port"))); try{ activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Email Notification"); }catch (Exception ex ){ LOG.error(ex.getMessage()); throw new Exception(" Email Notification Cannot be initialized . Reason : "+ex.getMessage()); } // Create Email Set<String> policies = activeAlerts.keySet(); for( String policyId : policies ){ AlertDefinitionAPIEntity alertDef = activeAlerts.get(policyId); List<EmailGenerator> tmpList = createEmailGenerator(alertDef); this.emailGenerators.put(policyId , tmpList); } } @Override public void onAlert(AlertAPIEntity alertEntity) { status = new NotificationStatus(); String policyId = alertEntity.getTags().get(AlertConstants.POLICY_ID); System.out.println(" Email Notification "); List<EmailGenerator> generatorList = this.emailGenerators.get(policyId); boolean isSuccess = false; for( EmailGenerator gen : generatorList ) { isSuccess = gen.sendAlertEmail(alertEntity); if( !isSuccess ) { status.setMessage(" Failed to send email "); status.setNotificationSuccess(false); }else status.setNotificationSuccess(true); } } } |
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Responsible to persist Alerts to Eagle Storage */ @Resource(name = "Eagle Store" , description = "Persist Alert Entity to Eagle Store") public class PersistToEagleStore implements NotificationPlugin { private static final Logger LOG = LoggerFactory.getLogger(PersistToEagleStore.class); private NotificationStatus status; private Config config; private EagleAlertPersist persist; @Override public void _init() throws Exception { config = EagleConfigFactory.load().getConfig(); this.persist = new EagleAlertPersist(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port"), config.getString("eagleProps.eagleService.username"), config.getString("eagleProps.eagleService.password")); } @Override public NotificationStatus getStatus() { return this.status; } @Override public void onAlert(AlertAPIEntity alertEntity) { try{ status = new NotificationStatus(); List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>(); list.add(alertEntity); persist.doPersist( list ); status.setNotificationSuccess(true); }catch (Exception ex ){ status.setMessage(ex.getMessage()); LOG.error(" Exception when Posting Alert Entity to Eagle Service Topic. Reason : "+ex.getMessage()); } } } |
Notification Manager :
Notification Manager responsible for
1) Scanning Notification Plugins
2) Loading all Plugins/ Persisting into alertNotifications Table
3) Broadcasting any updates in Policy to all Notification Plugins
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/**
* Notification manager that is responsible for
* <p> Scanning Plugins </p>
* <p> Loading Plugins and Policy Mapping </p>
* <p> Initializing Plugins </p>
* <p> Forwarding eagle alert to configured Notification Plugin </p>
* <p> BroadCast Changes in Policy to all Notification Plugins </p>
*/
public class NotificationManager {
public static Map<String, String > policyNotificationMapping = new ConcurrentHashMap<String,String>();
private static final Logger LOG = LoggerFactory.getLogger(NotificationManager.class);
/**
* Static Initializer of Manager
*/
static {
policyNotificationMapping.clear();
// initialize all Notification Plugins
_init();
}
/**
* Initialization of Notification Manager
*/
private static void _init() {
policyNotificationMapping.clear();
Set<String> plugins = NotificationPluginLoader.notificationMapping.keySet();
for( String plugin : plugins ){
try {
Object obj = NotificationPluginLoader.notificationMapping.get(plugin);
obj.getClass().getMethod("_init").invoke(obj); // invoke _init method of all notification plugins
} catch (Exception e) {
LOG.error(" Error in loading Plugins . Reason : "+e.getMessage());
}
}
Config config = EagleConfigFactory.load().getConfig();
String site = config.getString("eagleProps.site");
String dataSource = config.getString("eagleProps.dataSource");
// find notification Types
AlertDefinitionDAO alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
try{
List<AlertDefinitionAPIEntity> activeAlerts = alertDefinitionDao.findActiveAlertDefs( site , dataSource );
for( AlertDefinitionAPIEntity entity : activeAlerts ){
policyNotificationMapping.put(entity.getTags().get(AlertConstants.POLICY_ID) , entity.getTags().get(AlertConstants.NOTIFICATION_TYPE));
}
}catch (Exception ex ){
LOG.error(" Error in determining policy and its notification type. Reason : "+ex.getMessage());
}
}
/**
* To Pass Alert to respective Notification Plugins
* @param entity
*/
public void notifyAlert( AlertAPIEntity entity ) {
try {
Object obj = getNotificationPluginAPI( this.policyNotificationMapping.get(entity.getTags().get(AlertConstants.POLICY_ID)) );
obj.getClass().getMethod("onAlert" , new Class[]{AlertAPIEntity.class}).invoke( obj , entity);
} catch ( Exception ex) {
LOG.error(" Error in NotificationManager when invoking NotifyAlert method . Reason : "+ex.getMessage());
}
}
/**
* Returns Notification Plugin for the given Type
* @param type
* @return
*/
private Object getNotificationPluginAPI( String type ){
return NotificationPluginLoader.notificationMapping.get(type);
}
/**
* Update all Notification Plugin if changes in Policy
* @param entity
*/
public void updateNotificationPlugins( AlertDefinitionAPIEntity entity ){
try {
// Re Load the plugins
// Re visit this , this should be in better way
NotificationPluginLoader.loadPlugins();
// Re initialize Notification Manager
_init();
} catch (Exception e) {
LOG.error(" Error in updateNotificationPlugins . Reason : "+e.getMessage());
}
}
|
How to Create Policy with Notification Type ?:
When we create Policy , we need to select what is the Notification Type for the Policy.
Example :
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
Policy With Email Notification:
curl -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Email Notification\",\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}],"remediationDef":"","enabled":true}]'
Policy With Kafka Topic:
curl -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Kafka Store\" , \"kafkaTopic\":\"alerts\"} ]","remediationDef":"","enabled":true}]'
Policy With Eagle Store:
curl -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Eagle Store\"}]","remediationDef":"","enabled":true}]'
Mulitiple Plugins:
curl -u -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Esclate To ExternalSys\"},{\"notificationType\":\"Eagle Store\"},{\"notificationType\":\"Kafka Store\"
, \"kafkaTopic\" : \"alerts\"}]","remediationDef":"","enabled":true}]'
|
Querying Available Notifications :
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
http://localhost:8080/eagle-service/rest/entities?query=AlertNotificationService%5B%5D%7B*%7D&pageSize=2147483647&startTime=1970-01-01%2000:00:00&endTime=1970-01-11%2000:00:00&treeAgg=false
{"meta":{"elapsedms":77,"totalResults":4,"lastTimestamp":86400000,"firstTimestamp":86400000},"success":true,
"obj":[
{"prefix":"alertNotifications","tags":{"notificationType":"Eagle Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exUY4H4U","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Forward to External System"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exbzI93E","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Email Notification"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exe4g2y8","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Kafka Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exfPVxyc","enabled":true}
],
"type":"org.apache.eagle.alert.entity.AlertNotificationEntity"}
|
Config:
For creating Kafka Producer we need Kafka broker.
"eagleNotificationProps" : {
"kafka_broker":"localhost:9092"
}
For now , We Support only one Kafka Cluster where eagle reads from application.conf.
Alert Notification Executor:
Once the AlertAPIEntity generated from Policy Evaluation , AlertNotifcation Executor picks it up and send it to Configured Notification Plugin .
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
@Override
public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
String policyId = (String) input.get(0);
AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
processAlerts(policyId, Arrays.asList(alertEntity));
}
private void processAlerts(String policyId, List<AlertAPIEntity> list) {
for (AlertAPIEntity entity : list) {
notificationManager.notifyAlert(entity);
}
}
@Override
public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
for(AlertDefinitionAPIEntity alertDef : added.values()){
LOG.info("alert notification config really changed " + alertDef);
notificationManager.updateNotificationPlugins( alertDef );
}
} |
In future , we will decouple this Notification Executor later once we finalize the design for Generic Notification ( which accepts alerts from any System ).
How to Use Eagle Notification Plugin for Custom Implementation ?
Users should implement NotificationPlugin and write own logic .
Pls read Configuration from Config Object of Eagle .
For Example :
"notificationDef": "{\"kafkaTopic\":\"notification_topic_kafka\"}"
In code you need to get AlertDefinitionEntity.getNotificationDef() will return you the above JSON Str.. Once you get the config you can Parse it for your use.
Or you can simply forward it to external system .
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public CustomNotificationSink implements NotificationPlugin {
// Do your Plugin Initialization like Connecting to External System etc...
public void _init() throws Exception {
}
// Eagle Notification Manager forwards AlertAPIEntity to this onAlert method
public void onAlert() {
// Where to forward this alerts logic should be here ?
}
public NotificationStatus getStatus(){
// return the status of notification
}
} |
Development Steps:
To make use of Eagle Notification Plugin , below are the different steps to be followed.
1) Copy eagle jars into your library
2) Add eagle jars as dependency in pom.xml
3) Implement Notification Plugin Interface
4) Write your custom logic