...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
package org.apache.eagle.notification; import org.apache.eagle.alert.entity.AlertAPIEntity; /** * Notification Plugin Interface Plug-in interface which provide abstraction layer to notify different system */ public interface NotificationPlugin { /** * for initialization * @throws Exception */ public void _init() throws Exception; /** * Post a notification for the given alertalertEntity entity * @param alertEntity */ public void onAlert( AlertEntityAlertAPIEntity alertEntity ); throws NotificationException; /** * Returns Status of Notification Post * @return */ public NotificationStatus getStatus(); } |
At the time of Eagle Topology starts , Code should scan and register the Notification Type ( Custom Notification / Email / Kafka Message etc.. ). Since Eagle allows users to implement their own Notification Implementation , Our Topology Init code should automatically detects and register which type of Notification needs to be used .
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.eagle.notification; import org.apache.eagle.alert.entity.AlertAPIEntity; import org.apache.eagle.common.config.EagleConfig; import org.apache.eagle.common.config.EagleConfigFactory; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @SuppressWarnings({ "rawtypes", "unchecked" }) public class PersistAlertToKafkaTopic implements NotificationPlugin { private boolean isNotified; private static EagleConfig config = EagleConfigFactory.load(); @Override @Resource(name = "Kafka Store" , description = "Persist Alert Entity to Kafka") @SuppressWarnings({ "rawtypes", "unchecked" }) public class PersistAlertToKafkaTopic implements NotificationPlugin { private NotificationStatus status; private static final Logger LOG = LoggerFactory.getLogger(PersistAlertToKafkaTopic.class); private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>(); private Map<String, KafkaTopicConfig> kafaConfigs = new ConcurrentHashMap<String, KafkaTopicConfig>(); private Config config; private AlertDefinitionDAO alertDefinitionDao; @Override public void _init() throws Exception { config = EagleConfigFactory.load().getConfig(); String site = config.getString("eagleProps.site"); String dataSource = config.getString("eagleProps.dataSource"); activeAlerts.clear(); kafaConfigs.clear(); alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port"))); try{ activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Kafka Store"); }catch (Exception ex ){ LOG.error(ex.getMessage()); throw new Exception(" Kafka Store Cannot be initialized . Reason : "+ex.getMessage()); } Set<String> policies = activeAlerts.keySet(); for( String policyId : policies ) { Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email")); KafkaTopicConfig kafkaConfig = new KafkaTopicConfig(); try { kafkaConfig = JsonSerDeserUtils.deserialize(activeAlerts.get(policyId).getNotificationDef(), KafkaTopicConfig.class, Arrays.asList(module)); this.kafaConfigs.put(policyId,kafkaConfig); }catch (Exception ex){ LOG.error(" Exception when initializing PersistAlertToKafkaTopic. Reason : "+ex.getMessage()); } } } @Override public void onAlert(AlertAPIEntity alertEntity) { { try{ status = new NotificationStatus(); processAlertEntity(alertEntity); isNotified = true; } status.setNotificationSuccess(true); }catch(Exception ex ){ LOG.error(" Exception when Posting Alert Entity to Kafaka Topic. Reason : "+ex.getMessage()); status.setMessage(ex.getMessage()); } } /** * Access KafkaProducer and send entity to Bus * @param alertEntity * @throws Exception */ public void processAlertEntity( AlertAPIEntity alertEntity ) throws Exception { KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer(); producer.send(createRecord(alertEntity)); } } /** * To Create KafkaProducer Record * @param entity * @return * @throws Exception */ public ProducerRecord createRecord(AlertAPIEntity entity ) throws Exception { ProducerRecord String policyId = entity.getTags().get(AlertConstants.POLICY_ID); ProducerRecord record = new ProducerRecord(config.getConfig( this.kafaConfigs.get(policyId).getString(NotificationConstants.NOTIFICATION_ALERT_KAFKA_TOPICgetKafkaTopic(), entity.toString()); return record; } @Override public NotificationStatus getStatus() { NotificationStatus status = new NotificationStatus(); status.setNotificationSuccess(isNotified); return status; } } |
Email Alert Notification:
In Email Alert Notification API implementation ,
public class EmailNotification implements NotificationPlugin {
...
Email Notification allows us to send email alert/ sms to the configured email id.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@Resource(name = "Email Notification" , description = " Email Notification API to trigger email/sms ")
public class EmailNotification implements NotificationPlugin {
private static final Logger LOG = LoggerFactory.getLogger(EmailNotification.class);
private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
static Map<String, List<EmailGenerator>> emailGenerators = new ConcurrentHashMap<String, List<EmailGenerator>>();
private Config config;
private AlertDefinitionDAO alertDefinitionDao;
private NotificationStatus status ;
/* initialize Email Notification related Objects Properly */
public void _init() throws Exception {
// Get Config Object
config = EagleConfigFactory.load().getConfig();
String site = config.getString("eagleProps.site");
String dataSource = config.getString("eagleProps.dataSource");
activeAlerts.clear();
emailGenerators.clear();
// find out all policies and its notification Config
alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
try{
activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Email Notification");
}catch (Exception ex ){
LOG.error(ex.getMessage());
throw new Exception(" Email Notification Cannot be initialized . Reason : "+ex.getMessage());
}
// Create Email
Set<String> policies = activeAlerts.keySet();
for( String policyId : policies ){
AlertDefinitionAPIEntity alertDef = activeAlerts.get(policyId);
List<EmailGenerator> tmpList = createEmailGenerator(alertDef);
this.emailGenerators.put(policyId , tmpList);
}
}
@Override
public void onAlert(AlertAPIEntity alertEntity) {
status = new NotificationStatus();
String policyId = alertEntity.getTags().get(AlertConstants.POLICY_ID);
System.out.println(" Email Notification ");
List<EmailGenerator> generatorList = this.emailGenerators.get(policyId);
boolean isSuccess = false;
for( EmailGenerator gen : generatorList ) {
isSuccess = gen.sendAlertEmail(alertEntity);
if( !isSuccess ) {
status.setMessage(" Failed to send email ");
status.setNotificationSuccess(false);
}else
status.setNotificationSuccess(true);
}
}
} |
PersistAlertToEagle Service:
If user selects this Notification Plugin , all alerts will be persisted into Eagle Store.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Responsible to persist Alerts to Eagle Storage
*/
@Resource(name = "Eagle Store" , description = "Persist Alert Entity to Eagle Store")
public class PersistToEagleStore implements NotificationPlugin {
private static final Logger LOG = LoggerFactory.getLogger(PersistToEagleStore.class);
private NotificationStatus status;
private Config config;
private EagleAlertPersist persist;
@Override
public void _init() throws Exception |
Generic Notification Service :
...
How to accept alerts from Monitoring System ?:
Framework will expose Rest Service to external world where clients can post their messages.
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type" : "alert/metric", "action" : "ESCALATE/STORE", "system" : "SEC/HadoopServices", { If action=STORE this will be empty } "alert_definition": { config = EagleConfigFactory.load().getConfig(); "description" : "what alert it is ? ", this.persist = new EagleAlertPersist(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port"), config.getString("eagleProps.eagleService.username"), config.getString("eagleProps.eagleService.password")); } @Override "severity" : "CRITICAL", public NotificationStatus getStatus() { return this.status; } @Override "policy" : "public containsvoid ERRORonAlert()",AlertAPIEntity alertEntity) { try{ "remidiation_def" : "", status = new NotificationStatus(); List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>(); "createdDateTime" : "2016-01-12 22:59 15:00", list.add(alertEntity); persist.doPersist( list ); "notification_def": "", }, "metric_definition": { status.setNotificationSuccess(true); }catch (Exception ex ){ status.setMessage(ex.getMessage()); LOG.error(" Exception when Posting Alert Entity to Eagle Service Topic. Reason : "+ex.getMessage()); } } } |