JIRA :
Jira | ||||||
---|---|---|---|---|---|---|
|
Notification Plugin should provide interface which accepts the Eagle Alert Entity and return Status of the same.
Architecture:
Gliffy Diagram | ||||
---|---|---|---|---|
|
Today in Eagle there is no Notification Plugin Framework where users can implement the Notification Plugin Interface and upload their own library. This Plugin allows user to implement their own and also provides below default implementation
...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
package org.apache.eagle.notification; import org.apache.eagle.alert.entity.AlertAPIEntity; /** * Notification Plugin Interface Plug-in interface which provide abstraction layer to notify different system */ public interface NotificationPlugin { /** * for initialization * @throws Exception */ public void _init() throws Exception; /** * Post a notification for the given alertalertEntity entity * @param alertEntity */ public void onAlert( AlertEntityAlertAPIEntity alertEntity ); throws NotificationException; /** * Returns Status of Notification Post * @return */ public NotificationStatus getStatus(); } |
At the time of Eagle Topology starts , Code should scan and register the Notification Type ( Custom Notification / Email / Kafka Message etc.. ). Since Eagle allows users to implement their own Notification Implementation , Our Topology Init code should automatically detects and register which type of Notification needs to be used .
public class SECAlerts MessageNotification implements NotificationPlugin {
...
When users deploys their code with Eagle Service , Our Topology Initializer have to detect SECAlerts and MessageNotification and Register it automatically.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
@Resource(name = "Kafka Store" , description = "Persist Alert Entity to Kafka") @SuppressWarnings({ "rawtypes", "unchecked" }) public class PersistAlertToKafkaTopic implements NotificationPlugin { private NotificationStatus status; private static final Logger LOG = LoggerFactory.getLogger(PersistAlertToKafkaTopic.class); private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>(); private Map<String, KafkaTopicConfig> kafaConfigs = new ConcurrentHashMap<String, KafkaTopicConfig>(); private Config config; private AlertDefinitionDAO alertDefinitionDao; @Override public void _init() throws Exception { config = EagleConfigFactory.load().getConfig(); String site = config.getString("eagleProps.site"); String dataSource = config.getString("eagleProps.dataSource"); activeAlerts.clear(); kafaConfigs.clear(); alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port"))); try{ activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Kafka Store"); }catch (Exception ex ){ LOG.error(ex.getMessage()); throw new Exception(" Kafka Store Cannot be initialized . Reason : "+ex.getMessage()); } Set<String> policies = activeAlerts.keySet(); for( String policyId : policies ) { Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email")); KafkaTopicConfig kafkaConfig = new KafkaTopicConfig(); try { kafkaConfig = JsonSerDeserUtils.deserialize(activeAlerts.get(policyId).getNotificationDef(), KafkaTopicConfig.class, Arrays.asList(module)); this.kafaConfigs.put(policyId,kafkaConfig); }catch (Exception ex){ LOG.error(" Exception when initializing PersistAlertToKafkaTopic. Reason : "+ex.getMessage()); } } } @Override package org.apache.eagle.notification; import org.apache.eagle.alert.entity.AlertAPIEntity; import org.apache.eagle.common.config.EagleConfig; import org.apache.eagle.common.config.EagleConfigFactory; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @SuppressWarnings({ "rawtypes", "unchecked" }) public class PersistAlertToKafkaTopic implements NotificationPlugin { private boolean isNotified; private static EagleConfig config = EagleConfigFactory.load(); @Override public void onAlert(AlertAPIEntity alertEntity) { { try{ status = new NotificationStatus(); processAlertEntity(alertEntity); isNotified = true; } status.setNotificationSuccess(true); }catch(Exception ex ){ LOG.error(" Exception when Posting Alert Entity to Kafaka Topic. Reason : "+ex.getMessage()); status.setMessage(ex.getMessage()); } } /** * Access KafkaProducer and send entity to Bus * @param alertEntity * @throws Exception */ public void processAlertEntity( AlertAPIEntity alertEntity ) throws Exception { KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer(); producer.send(createRecord(alertEntity)); } } /** * To Create KafkaProducer Record * @param entity * @return * @throws Exception */ public ProducerRecord createRecord(AlertAPIEntity entity ){ ProducerRecord throws Exception { String policyId = entity.getTags().get(AlertConstants.POLICY_ID); ProducerRecord record = new ProducerRecord( this.kafaConfigs.get(policyId).getKafkaTopic(), entity.toString()); return record; } @Override public NotificationStatus getStatus() { return status; } |
Email Alert Notification:
Email Notification allows us to send email alert/ sms to the configured email id.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
@Resource(name = "Email Notification" , description = " Email Notification API to trigger email/sms ") public class EmailNotification implements NotificationPlugin { private static final Logger LOG = LoggerFactory.getLogger(EmailNotification.class); private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>(); static Map<String, List<EmailGenerator>> emailGenerators = new ConcurrentHashMap<String, List<EmailGenerator>>(); private Config config; private AlertDefinitionDAO alertDefinitionDao; private NotificationStatus status ; /* initialize Email Notification related Objects Properly */ public void _init() throws Exception { // Get Config Object config = EagleConfigFactory.load().getConfig().getString(NotificationConstants.NOTIFICATION_ALERT_KAFKA_TOPIC), entity); return record; } @Override public NotificationStatus getStatus() { NotificationStatus; String site = config.getString("eagleProps.site"); String dataSource = config.getString("eagleProps.dataSource"); activeAlerts.clear(); emailGenerators.clear(); // find out all policies and its notification Config alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port"))); try{ activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Email Notification"); }catch (Exception ex ){ LOG.error(ex.getMessage()); throw new Exception(" Email Notification Cannot be initialized . Reason : "+ex.getMessage()); } // Create Email Set<String> policies = activeAlerts.keySet(); for( String policyId : policies ){ AlertDefinitionAPIEntity alertDef = activeAlerts.get(policyId); List<EmailGenerator> tmpList = createEmailGenerator(alertDef); this.emailGenerators.put(policyId , tmpList); } } @Override public void onAlert(AlertAPIEntity alertEntity) { status = new NotificationStatus(); String policyId = alertEntity.getTags().get(AlertConstants.POLICY_ID); System.out.println(" Email Notification "); List<EmailGenerator> generatorList = this.emailGenerators.get(policyId); boolean isSuccess = false; for( EmailGenerator gen : generatorList ) { isSuccess = gen.sendAlertEmail(alertEntity); if( !isSuccess ) { status.setMessage(" Failed to send email "); status.setNotificationSuccess(false); }else status.setNotificationSuccess(true); } } } |
PersistAlertToEagle Service:
If user selects this Notification Plugin , all alerts will be persisted into Eagle Store.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Responsible to persist Alerts to Eagle Storage */ @Resource(name = "Eagle Store" , description = "Persist Alert Entity to Eagle Store") public class PersistToEagleStore implements NotificationPlugin { private static final Logger LOG = LoggerFactory.getLogger(PersistToEagleStore.class); private NotificationStatus status; private Config config; private EagleAlertPersist persist; @Override public void _init() throws Exception { config = EagleConfigFactory.load().getConfig(); this.persist = new EagleAlertPersist(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port"), config.getString("eagleProps.eagleService.username"), config.getString("eagleProps.eagleService.password")); } @Override public NotificationStatus getStatus() { return this.status; } @Override public void onAlert(AlertAPIEntity alertEntity) { try{ status = new NotificationStatus(); status.setNotificationSuccess(isNotified); return status; } } List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>(); list.add(alertEntity); persist.doPersist( list ); status.setNotificationSuccess(true); }catch (Exception ex ){ status.setMessage(ex.getMessage()); LOG.error(" Exception when Posting Alert Entity to Eagle Service Topic. Reason : "+ex.getMessage()); } } } |
Notification Manager :
Notification Manager responsible for
1) Scanning Notification Plugins
2) Loading all Plugins/ Persisting into alertNotifications Table
3) Broadcasting any updates in Policy to all Notification Plugins
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/**
* Notification manager that is responsible for
* <p> Scanning Plugins </p>
* <p> Loading Plugins and Policy Mapping </p>
* <p> Initializing Plugins </p>
* <p> Forwarding eagle alert to configured Notification Plugin </p>
* <p> BroadCast Changes in Policy to all Notification Plugins </p>
*/
public class NotificationManager {
public static Map<String, String > policyNotificationMapping = new ConcurrentHashMap<String,String>();
private static final Logger LOG = LoggerFactory.getLogger(NotificationManager.class);
/**
* Static Initializer of Manager
*/
static {
policyNotificationMapping.clear();
// initialize all Notification Plugins
_init();
}
/**
* Initialization of Notification Manager
*/
private static void _init() {
policyNotificationMapping.clear();
Set<String> plugins = NotificationPluginLoader.notificationMapping.keySet();
for( String plugin : plugins ){
try {
Object obj = NotificationPluginLoader.notificationMapping.get(plugin);
obj.getClass().getMethod("_init").invoke(obj); // invoke _init method of all notification plugins
} catch (Exception e) {
LOG.error(" Error in loading Plugins . Reason : "+e.getMessage());
}
}
Config config = EagleConfigFactory.load().getConfig();
String site = config.getString("eagleProps.site");
String dataSource = config.getString("eagleProps.dataSource");
// find notification Types
AlertDefinitionDAO alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
try{
List<AlertDefinitionAPIEntity> activeAlerts = alertDefinitionDao.findActiveAlertDefs( site , dataSource );
for( AlertDefinitionAPIEntity entity : activeAlerts ){
policyNotificationMapping.put(entity.getTags().get(AlertConstants.POLICY_ID) , entity.getTags().get(AlertConstants.NOTIFICATION_TYPE));
}
}catch (Exception ex ){
LOG.error(" Error in determining policy and its notification type. Reason : "+ex.getMessage());
}
}
/**
* To Pass Alert to respective Notification Plugins
* @param entity
*/
public void notifyAlert( AlertAPIEntity entity ) {
try {
Object obj = getNotificationPluginAPI( this.policyNotificationMapping.get(entity.getTags().get(AlertConstants.POLICY_ID)) );
obj.getClass().getMethod("onAlert" , new Class[]{AlertAPIEntity.class}).invoke( obj , entity);
} catch ( Exception ex) {
LOG.error(" Error in NotificationManager when invoking NotifyAlert method . Reason : "+ex.getMessage());
}
}
/**
* Returns Notification Plugin for the given Type
* @param type
* @return
*/
private Object getNotificationPluginAPI( String type ){
return NotificationPluginLoader.notificationMapping.get(type);
}
/**
* Update all Notification Plugin if changes in Policy
* @param entity
*/
public void updateNotificationPlugins( AlertDefinitionAPIEntity entity ){
try {
// Re Load the plugins
// Re visit this , this should be in better way
NotificationPluginLoader.loadPlugins();
// Re initialize Notification Manager
_init();
} catch (Exception e) {
LOG.error(" Error in updateNotificationPlugins . Reason : "+e.getMessage());
}
}
|
How to Create Policy with Notification Type ?:
When we create Policy , we need to select what is the Notification Type for the Policy.
Example :
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
Policy With Email Notification:
curl -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Email Notification\",\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}],"remediationDef":"","enabled":true}]'
Policy With Kafka Topic:
curl -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Kafka Store\" , \"kafkaTopic\":\"alerts\"} ]","remediationDef":"","enabled":true}]'
Policy With Eagle Store:
curl -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Eagle Store\"}]","remediationDef":"","enabled":true}]'
Mulitiple Plugins:
curl -u -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Esclate To ExternalSys\"},{\"notificationType\":\"Eagle Store\"},{\"notificationType\":\"Kafka Store\"
, \"kafkaTopic\" : \"alerts\"}]","remediationDef":"","enabled":true}]'
|
Querying Available Notifications :
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
http://localhost:8080/eagle-service/rest/entities?query=AlertNotificationService%5B%5D%7B*%7D&pageSize=2147483647&startTime=1970-01-01%2000:00:00&endTime=1970-01-11%2000:00:00&treeAgg=false
{"meta":{"elapsedms":77,"totalResults":4,"lastTimestamp":86400000,"firstTimestamp":86400000},"success":true,
"obj":[
{"prefix":"alertNotifications","tags":{"notificationType":"Eagle Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exUY4H4U","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Forward to External System"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exbzI93E","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Email Notification"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exe4g2y8","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Kafka Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exfPVxyc","enabled":true}
],
"type":"org.apache.eagle.alert.entity.AlertNotificationEntity"}
|
Config:
For creating Kafka Producer we need Kafka broker.
"eagleNotificationProps" : {
"kafka_broker":"localhost:9092"
}
For now , We Support only one Kafka Cluster where eagle reads from application.conf.
Alert Notification Executor:
Once the AlertAPIEntity generated from Policy Evaluation , AlertNotifcation Executor picks it up and send it to Configured Notification Plugin .
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
@Override
public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
String policyId = (String) input.get(0);
AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
processAlerts(policyId, Arrays.asList(alertEntity));
}
private void processAlerts(String policyId, List<AlertAPIEntity> list) {
for (AlertAPIEntity entity : list) {
notificationManager.notifyAlert(entity);
}
}
@Override
public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
for(AlertDefinitionAPIEntity alertDef : added.values()){
LOG.info("alert notification config really changed " + alertDef);
notificationManager.updateNotificationPlugins( alertDef );
}
} |
In future , we will decouple this Notification Executor later once we finalize the design for Generic Notification ( which accepts alerts from any System ).
How to Use Eagle Notification Plugin for Custom Implementation ?
Users should implement NotificationPlugin and write own logic .
Pls read Configuration from Config Object of Eagle .
For Example :
"notificationDef": "{\"kafkaTopic\":\"notification_topic_kafka\"}"
In code you need to get AlertDefinitionEntity.getNotificationDef() will return you the above JSON Str.. Once you get the config you can Parse it for your use.
Or you can simply forward it to external system .
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public CustomNotificationSink implements NotificationPlugin {
// Do your Plugin Initialization like Connecting to External System etc...
public void _init() throws Exception {
}
// Eagle Notification Manager forwards AlertAPIEntity to this onAlert method
public void onAlert() {
// Where to forward this alerts logic should be here ?
}
public NotificationStatus getStatus(){
// return the status of notification
}
} |
Development Steps:
To make use of Eagle Notification Plugin , below are the different steps to be followed.
1) Copy eagle jars into your library
2) Add eagle jars as dependency in pom.xml
3) Implement Notification Plugin Interface
4) Write your custom logic