Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeConfluence
titleNotification Interface
linenumberstrue
collapsetrue
package org.apache.eagle.notification;

import org.apache.eagle.alert.entity.AlertAPIEntity;

/**
 * Notification Plugin Interface 
 Plug-in interface which provide abstraction layer to notify different system 
 */
public interface NotificationPlugin {

   /**
    * for initialization
    * @throws Exception
     */
   public void _init() throws  Exception;
   /**
     * Post a notification for the given alertalertEntity entity
     * @param alertEntity
     */
    public void onAlert( AlertEntityAlertAPIEntity alertEntity );
 throws  NotificationException;
    /**
    *  Returns Status of Notification Post 
    * @return
    */
	   public NotificationStatus  getStatus();
}

 

At the time of Eagle Topology starts ,  Code should scan and register the Notification Type ( Custom Notification / Email / Kafka Message  etc.. ).  Since Eagle allows users to implement their own Notification Implementation  , Our Topology Init code should automatically detects and register which type of Notification needs to be used .

...

Code Block
languagejava
themeConfluence
titlePersistAlertToKafka
collapsetrue
package org.apache.eagle.notification;
import org.apache.eagle.alert.entity.AlertAPIEntity;
import org.apache.eagle.common.config.EagleConfig;
import org.apache.eagle.common.config.EagleConfigFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class PersistAlertToKafkaTopic  implements NotificationPlugin {
	private boolean isNotified;	
	private static EagleConfig config = EagleConfigFactory.load();
	
	@Override
	@Resource(name = "Kafka Store" , description = "Persist Alert Entity to Kafka")
@SuppressWarnings({ "rawtypes", "unchecked" })
public class PersistAlertToKafkaTopic  implements NotificationPlugin {

   private NotificationStatus status;

   private static final Logger LOG = LoggerFactory.getLogger(PersistAlertToKafkaTopic.class);
   private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
   private Map<String, KafkaTopicConfig> kafaConfigs = new ConcurrentHashMap<String, KafkaTopicConfig>();
   private Config config;
   private AlertDefinitionDAO alertDefinitionDao;

   @Override
   public void _init() throws  Exception  {
      config = EagleConfigFactory.load().getConfig();
      String site = config.getString("eagleProps.site");
      String dataSource = config.getString("eagleProps.dataSource");
      activeAlerts.clear();
      kafaConfigs.clear();
      alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"),  config.getInt("eagleProps.eagleService.port")));
      try{
         activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Kafka Store");
      }catch (Exception ex ){
         LOG.error(ex.getMessage());
         throw  new Exception(" Kafka Store  Cannot be initialized . Reason : "+ex.getMessage());
      }
      Set<String> policies = activeAlerts.keySet();
      for( String policyId : policies )
      {
         Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
         KafkaTopicConfig kafkaConfig = new KafkaTopicConfig();
         try {
            kafkaConfig = JsonSerDeserUtils.deserialize(activeAlerts.get(policyId).getNotificationDef(), KafkaTopicConfig.class, Arrays.asList(module));
            this.kafaConfigs.put(policyId,kafkaConfig);
         }catch (Exception ex){
            LOG.error(" Exception when initializing PersistAlertToKafkaTopic. Reason : "+ex.getMessage());
         }
      }
   }

   @Override
   public void onAlert(AlertAPIEntity alertEntity) {
 {
		     try{
          status = new NotificationStatus();
         processAlertEntity(alertEntity);
		isNotified = true;
	}
		
	         status.setNotificationSuccess(true);
      }catch(Exception ex ){
         LOG.error(" Exception when Posting Alert Entity to Kafaka Topic. Reason : "+ex.getMessage());
         status.setMessage(ex.getMessage());
      }     
   }

   /**
    * Access KafkaProducer and send entity to Bus 
    * @param alertEntity
    * @throws Exception
    */
   public void processAlertEntity( AlertAPIEntity alertEntity ) throws Exception {
		      KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer();
		      producer.send(createRecord(alertEntity));
		
	}
	
	     
   }
   
   /**
    * To Create  KafkaProducer Record 
    * @param entity
    * @return
    * @throws Exception
    */
   public ProducerRecord  createRecord(AlertAPIEntity entity ) throws Exception {
		ProducerRecord
      String policyId = entity.getTags().get(AlertConstants.POLICY_ID);
      ProducerRecord  record  = new ProducerRecord(config.getConfig( this.kafaConfigs.get(policyId).getString(NotificationConstants.NOTIFICATION_ALERT_KAFKA_TOPICgetKafkaTopic(), entity.toString());
		      return record;
	   }  
	
	   
   @Override
	   public NotificationStatus getStatus() {
		NotificationStatus status = new NotificationStatus();
		status.setNotificationSuccess(isNotified);
		       
      return status;
	   }
}

 

Email Alert Notification:

            In Email Alert Notification API implementation ,                    

public class EmailNotification implements NotificationPlugin {

...

Email Notification  allows us to send email alert/ sms to the configured email id. 

Code Block
languagejava
themeConfluence
titleEmailNotification
@Resource(name = "Email Notification" , description = "  Email Notification API to trigger email/sms ")
public class EmailNotification  implements NotificationPlugin {

   private static final Logger LOG = LoggerFactory.getLogger(EmailNotification.class);
   private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
   static Map<String, List<EmailGenerator>> emailGenerators = new ConcurrentHashMap<String, List<EmailGenerator>>();
   private Config config;
   private AlertDefinitionDAO alertDefinitionDao;
   private NotificationStatus status ;

   /* initialize Email Notification related Objects Properly */

   public void _init() throws  Exception {
      // Get Config Object
      config = EagleConfigFactory.load().getConfig();
      String site = config.getString("eagleProps.site");
      String dataSource = config.getString("eagleProps.dataSource");
      activeAlerts.clear();
      emailGenerators.clear();
      // find out all policies and its notification Config
      alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
      try{
         activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Email Notification");
      }catch (Exception ex ){
         LOG.error(ex.getMessage());
         throw  new Exception(" Email Notification Cannot be initialized . Reason : "+ex.getMessage());
      }
      // Create Email
      Set<String> policies = activeAlerts.keySet();
      for( String policyId : policies ){
         AlertDefinitionAPIEntity alertDef = activeAlerts.get(policyId);
         List<EmailGenerator> tmpList = createEmailGenerator(alertDef);
         this.emailGenerators.put(policyId , tmpList);
      }
   }

   @Override
   public void onAlert(AlertAPIEntity alertEntity) {
      status = new NotificationStatus();
      String policyId = alertEntity.getTags().get(AlertConstants.POLICY_ID);
      System.out.println(" Email Notification ");
      List<EmailGenerator> generatorList = this.emailGenerators.get(policyId);
      boolean isSuccess = false;
      for( EmailGenerator gen : generatorList ) {
         isSuccess = gen.sendAlertEmail(alertEntity);
         if( !isSuccess ) {
            status.setMessage(" Failed to send email ");
            status.setNotificationSuccess(false);
         }else
            status.setNotificationSuccess(true);
      }
   }
}

PersistAlertToEagle Service:

 If user selects this Notification Plugin , all alerts will be persisted into Eagle Store.

 

Code Block
languagejava
themeConfluence
titleEalge Store
/**
 * Responsible to persist Alerts to Eagle Storage
 */
@Resource(name = "Eagle Store" , description = "Persist Alert Entity to Eagle Store")
public class PersistToEagleStore implements  NotificationPlugin {

    private static final Logger LOG = LoggerFactory.getLogger(PersistToEagleStore.class);
    private NotificationStatus status;

    private Config config;
    private EagleAlertPersist persist;

    @Override
    public void _init() throws Exception

 

Generic Notification Service :

...

 

  How to accept alerts from Monitoring System ?:

  Framework will expose Rest Service to external world where clients can post their messages.

Code Block
languagejs
titleServiceInput
{
 "type" : "alert/metric",
 "action" : "ESCALATE/STORE",
 "system" : "SEC/HadoopServices",  { If action=STORE this will be empty }
 "alert_definition": {
        config = EagleConfigFactory.load().getConfig();
  "description" : "what alert it is ? ",      this.persist = new EagleAlertPersist(config.getString("eagleProps.eagleService.host"),  config.getInt("eagleProps.eagleService.port"),
                                             config.getString("eagleProps.eagleService.username"), config.getString("eagleProps.eagleService.password"));
    }

    @Override
    "severity" : "CRITICAL",
  public NotificationStatus getStatus() {
        return this.status;
    }

    @Override
  "policy" : "public containsvoid ERRORonAlert()",AlertAPIEntity alertEntity) {
        try{
            "remidiation_def" : "",
status = new NotificationStatus();
            List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
        "createdDateTime" : "2016-01-12 22:59 15:00",
  list.add(alertEntity);
            persist.doPersist( list );
           "notification_def": "",
 },
 "metric_definition": {
 status.setNotificationSuccess(true);
        }catch (Exception ex ){
            status.setMessage(ex.getMessage());
            LOG.error(" Exception when Posting Alert Entity to Eagle Service Topic. Reason : "+ex.getMessage());
        }
    }
}