...
Preload historical data
Deep storage
Policy engine should support state persist and restore. Siddhi is one major policy engine which Eagle currently supports, and Siddhi supports state persist and store.
Code Block |
---|
public interface Snapshotable {
public Object[] currentState();
public void restoreState(Object[] state);
public String getElementId();
} |
In Eagle AlertExecutor is the processing element which handles policy evaluation, policy lifecycle management, so AlertExecutor is the place where policy state can be persisted and restored.
Current policy state = snapshot of policy state + series of events after snapshot
When to do snapshot
Option 1: each AlertExecutor creates a daemon thread which is dedicated for taking snapshot for all polices being processed by this AlertExecutor
Option 2: use storm tick tuple to notify AlertExecutor do a snapshot
While doing snapshot, the whole AlertExecutor is halted to keep state consistent.
Note: make sure the snapshot includes all the events for all policies sent before snapshot is started. (think time)
Where to store snapshot
Option1: local disk
Option2: centralized storage (storm has blob storage)
When to do delta events persistence
After each snapshot, persist each event
Where to store delta events
Option1: write-ahead log and move to centralized storage in the backend or replicate to other nodes
Option2: persist each event into Kafka
Option3: persist each event into eagle
Policy state recovery
Recovery only happens when AlertExecutor is first started or restarted by framework.
Recovery includes restoring snapshot and applying delta events
Policy state size
Normally the policy state is pretty small especially for those aggregation like policy. But sometimes policy state could be very big for some statistics algorithm.
Performance loss
Duration for taking snapshot
Duration for persisting each event
Policy state for policy
Each policy is supposed to have different state so that policy state must be per policy
Delta events for policy group
Each policy consumes the same stream of events so delta events can be persisted per policy group.
policy group should be affiliated with one AlertExecutor.
Policy state for policy add/delete/update
If policy is deleted, the policy state can be deleted for this policy
If policy is updated, the policy state can be deleted as well. (User should take responsibility for state loss due to policy update)
If policy is added, the policy state can be added for this policy
Delta events for policy add/delete/update
If policy is deleted, the policy can be deleted from policy group
If policy is updated, no change
If policy is added, the policy can be added into policy group
Retention for delta events
The retention time should be max window size for all policies in the policy group
Topology restart/recreate/rebalance
If topology is restarted, policy state and delta events do not change at all.
If topology is recreated because of code change, policy state and delete events should be marked as stale and are to be deleted.
If topology is rebalanced, there is no change for policy state and delta events.
Data Structure
Policy state
Key | Value |
---|---|
policyId | binary state for this policy |
topologyId |
delta events for policy groups
Key | Value | |
---|---|---|
policyGroupId (alertExecutorId) | offset range if stored in Kafka |
policy group
Key | Value |
---|---|
policyGroupId (alertExecutorId) | policyId |
Purge delta events before the latest snapshots