You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Notification Plugin should provide interface which accepts the Eagle Alert Entity and return Status of the same.

Today in Eagle there is no Notification Plugin Framework where users can implement the Notification Plugin Interface and upload their own library. This Plugin allows user to implement their own and also provides below default implementation 

  •  Sending Email Alert
  •  Persisting Message to Kafka 
  •  Writing Entities to Eagle Service

Notification Interface:

Notification Interface
package org.apache.eagle.notification;

import org.apache.eagle.alert.entity.AlertAPIEntity;

/**
 * Notification Plug-in interface which provide abstraction layer to notify different system 
 */
public interface NotificationPlugin {

   /**
    * for initialization
    * @throws Exception
     */
   public void _init() throws  Exception;
   /**
    * Post a notification for the given alertEntity 
    * @param alertEntity
    */
   public void onAlert( AlertAPIEntity alertEntity );
   
   /**
    * Returns Status of Notification Post 
    * @return
    */
   public NotificationStatus getStatus();
}

 

At the time of Eagle Topology starts ,  Code should scan and register the Notification Type ( Custom Notification / Email / Kafka Message  etc.. ).  Since Eagle allows users to implement their own Notification Implementation  , Our Topology Init code should automatically detects and register which type of Notification needs to be used .

 public class MessageNotification implements  NotificationPlugin {

 }

 When users deploys their code with Eagle Service , Our Topology Initializer  have to detect MessageNotification  and Register it automatically.

How to select Notification When defining Policy ?

      Eagle should allow users to select the Notification Type  at the time of Policy Creation.. For that we need to persist the detected policies in HBase and Provide API on Top of It to Query.        

      To ensure the consistency always delete the Notification Table and Recreate with detected Notifications.

 

Persisting Message to Kafka:

PersistAlertToKafka
@Resource(name = "Kafka Store" , description = "Persist Alert Entity to Kafka")
@SuppressWarnings({ "rawtypes", "unchecked" })
public class PersistAlertToKafkaTopic  implements NotificationPlugin {

   private NotificationStatus status;

   private static final Logger LOG = LoggerFactory.getLogger(PersistAlertToKafkaTopic.class);
   private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
   private Map<String, KafkaTopicConfig> kafaConfigs = new ConcurrentHashMap<String, KafkaTopicConfig>();
   private Config config;
   private AlertDefinitionDAO alertDefinitionDao;

   @Override
   public void _init() throws  Exception  {
      config = EagleConfigFactory.load().getConfig();
      String site = config.getString("eagleProps.site");
      String dataSource = config.getString("eagleProps.dataSource");
      activeAlerts.clear();
      kafaConfigs.clear();
      alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"),  config.getInt("eagleProps.eagleService.port")));
      try{
         activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Kafka Store");
      }catch (Exception ex ){
         LOG.error(ex.getMessage());
         throw  new Exception(" Kafka Store  Cannot be initialized . Reason : "+ex.getMessage());
      }
      Set<String> policies = activeAlerts.keySet();
      for( String policyId : policies )
      {
         Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
         KafkaTopicConfig kafkaConfig = new KafkaTopicConfig();
         try {
            kafkaConfig = JsonSerDeserUtils.deserialize(activeAlerts.get(policyId).getNotificationDef(), KafkaTopicConfig.class, Arrays.asList(module));
            this.kafaConfigs.put(policyId,kafkaConfig);
         }catch (Exception ex){
            LOG.error(" Exception when initializing PersistAlertToKafkaTopic. Reason : "+ex.getMessage());
         }
      }
   }

   @Override
   public void onAlert(AlertAPIEntity alertEntity) {
      try{
          status = new NotificationStatus();
         processAlertEntity(alertEntity);
         status.setNotificationSuccess(true);
      }catch(Exception ex ){
         LOG.error(" Exception when Posting Alert Entity to Kafaka Topic. Reason : "+ex.getMessage());
         status.setMessage(ex.getMessage());
      }     
   }

   /**
    * Access KafkaProducer and send entity to Bus 
    * @param alertEntity
    * @throws Exception
    */
   public void processAlertEntity( AlertAPIEntity alertEntity ) throws Exception {
      KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer();
      producer.send(createRecord(alertEntity));     
   }
   
   /**
    * To Create  KafkaProducer Record 
    * @param entity
    * @return
    * @throws Exception
    */
   public ProducerRecord  createRecord(AlertAPIEntity entity ) throws Exception {
      String policyId = entity.getTags().get(AlertConstants.POLICY_ID);
      ProducerRecord  record  = new ProducerRecord( this.kafaConfigs.get(policyId).getKafkaTopic(), entity.toString());
      return record;
   }  
   
   @Override
   public NotificationStatus getStatus() {       
      return status;
   }

 

Email Alert Notification:

      Email Notification  allows us to send email alert/ sms to the configured email id. 

EmailNotification
@Resource(name = "Email Notification" , description = "  Email Notification API to trigger email/sms ")
public class EmailNotification  implements NotificationPlugin {

   private static final Logger LOG = LoggerFactory.getLogger(EmailNotification.class);
   private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
   static Map<String, List<EmailGenerator>> emailGenerators = new ConcurrentHashMap<String, List<EmailGenerator>>();
   private Config config;
   private AlertDefinitionDAO alertDefinitionDao;
   private NotificationStatus status ;

   /* initialize Email Notification related Objects Properly */

   public void _init() throws  Exception {
      // Get Config Object
      config = EagleConfigFactory.load().getConfig();
      String site = config.getString("eagleProps.site");
      String dataSource = config.getString("eagleProps.dataSource");
      activeAlerts.clear();
      emailGenerators.clear();
      // find out all policies and its notification Config
      alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
      try{
         activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Email Notification");
      }catch (Exception ex ){
         LOG.error(ex.getMessage());
         throw  new Exception(" Email Notification Cannot be initialized . Reason : "+ex.getMessage());
      }
      // Create Email
      Set<String> policies = activeAlerts.keySet();
      for( String policyId : policies ){
         AlertDefinitionAPIEntity alertDef = activeAlerts.get(policyId);
         List<EmailGenerator> tmpList = createEmailGenerator(alertDef);
         this.emailGenerators.put(policyId , tmpList);
      }
   }

   @Override
   public void onAlert(AlertAPIEntity alertEntity) {
      status = new NotificationStatus();
      String policyId = alertEntity.getTags().get(AlertConstants.POLICY_ID);
      System.out.println(" Email Notification ");
      List<EmailGenerator> generatorList = this.emailGenerators.get(policyId);
      boolean isSuccess = false;
      for( EmailGenerator gen : generatorList ) {
         isSuccess = gen.sendAlertEmail(alertEntity);
         if( !isSuccess ) {
            status.setMessage(" Failed to send email ");
            status.setNotificationSuccess(false);
         }else
            status.setNotificationSuccess(true);
      }
   }
}

PersistAlertToEagle Service:

 If user selects this Notification Plugin , all alerts will be persisted into Eagle Store.

 

Ealge Store
/**
 * Responsible to persist Alerts to Eagle Storage
 */
@Resource(name = "Eagle Store" , description = "Persist Alert Entity to Eagle Store")
public class PersistToEagleStore implements  NotificationPlugin {

    private static final Logger LOG = LoggerFactory.getLogger(PersistToEagleStore.class);
    private NotificationStatus status;

    private Config config;
    private EagleAlertPersist persist;

    @Override
    public void _init() throws Exception {
        config = EagleConfigFactory.load().getConfig();
        this.persist = new EagleAlertPersist(config.getString("eagleProps.eagleService.host"),  config.getInt("eagleProps.eagleService.port"),
                                             config.getString("eagleProps.eagleService.username"), config.getString("eagleProps.eagleService.password"));
    }

    @Override
    public NotificationStatus getStatus() {
        return this.status;
    }

    @Override
    public void onAlert(AlertAPIEntity alertEntity) {
        try{
            status = new NotificationStatus();
            List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
            list.add(alertEntity);
            persist.doPersist( list );
            status.setNotificationSuccess(true);
        }catch (Exception ex ){
            status.setMessage(ex.getMessage());
            LOG.error(" Exception when Posting Alert Entity to Eagle Service Topic. Reason : "+ex.getMessage());
        }
    }
}

 

 

  • No labels