Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: fixed typos

Status

Current state: "Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In situations like Risk Controlling or Pattern Matching, it is common that users want to change the pattern that events need to match to while the pattern is still functioning to provide service. In current Flink CEP, a CEP operator has a fixed CEP pattern and does not support changing it. Thus in order to achieve the goal described above, users have to restart flink jobs and wait relatively long update time.

...

In this FLIP, we propose to add a couple of APIs and classes to Flink CEP in order to support having multiple patterns in one operator and updating patterns dynamically without stoppling Flink jobs.

Public Interfaces

We propose to make the following API changes to support dynamic pattern changing in CEP.

Add Rule Interface

The term "rule" defines a certain pattern, how to match the pattern, and how to process the found matches. The properties of the rule are required by CEP operator in order to function correctly.

...

Given the needs of data exchange and state storage, the Rule must be serializable, which requires that the classes involved in the Rule must also be serializable, for example, make Pattern implement Serializable.

Add RuleManager Interface

We propose to introduce the RuleManager interface. The manager handles the rule updates and provides information about current active rules.

...

Code Block
/**
 * This manager handles updated rules and manages current rules.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface RuleManager<IN, OUT> {
    /**
     * Deal with the notification that rules are updated.
     *
     * @param rules A list of all latest rules.
     */
    void onRulesUpdated(List<Rule<IN, OUT>> rules);

    /**
     * Returns the current rules managed.
     *
     * @return The current rules.
     */
    List<Rule<IN, OUT>> getCurrentRules();
}

Add RuleDiscoverer Interface

We introduce the RuleDiscoverer interface, which discovers the rule changes and notifies the RuleManager of the collected rule updates. This interface also serves to provide initial rules.

...

Code Block
/**
 * Class that periodically checks for rule updates.
 */
public abstract class AbstractPeriodicRuleDiscoverer<IN, OUT> implements RuleDiscoverer<IN, OUT> {
    private final List<Rule<IN, OUT>> initRules;
    private final long intervalMillis;
    private final RuleManager<IN, OUT> ruleManager;

    /**
     * Constructor.
     * 
     * @param intervalMillis Time interval in milliseconds at which to check updates.
     */
    public AbstractPeriodicRuleDiscoverer(
            List<Rule<IN, OUT>> initRules, long intervalMillis, RuleManager<IN, OUT> ruleManager) {
        this.initRules = initRules;
        this.intervalMillis = intervalMillis;
        this.ruleManager = ruleManager;
    }

    /** Checks whether there is rule update. */
    public abstract boolean checkRuleUpdate();

    /** Returns the latest rules. */
    public abstract List<Rule<IN, OUT>> getLatestRules();

    @Override
    public void discoverRuleChanges() {
        while (true) {
            if (checkRuleUpdate()) {
                List<Rule<IN, OUT>> rules = getLatestRules();
                ruleManager.onRulesUpdated(rules);
            }
            try {
                Thread.sleep(intervalMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }

    @Override
    public List<Rule<IN, OUT>> getInitialRules() {
        return initRules;
    }
}

Add RuleDiscovererFactory Interface

The RuleDiscovererFactory is introduced to create the RuleDiscoverer with the RuleManager, that requires RuleDiscoverer to notify RuleManager to deal with the rule changes.

Code Block
/**
 * A factory for {@link RuleDiscoverer}. The created {@link RuleDiscoverer} should notify {@link
 * RuleManager} to deal with the rule changes.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
public interface RuleDiscovererFactory<IN, OUT> extends Serializable {
    /**
     * Creates a {@link RuleDiscoverer}.
     *
     * @param ruleManager The rule manager notified.
     * @return A {@link RuleDiscoverer} instance.
     */
    RuleDiscoverer<IN, OUT> createRuleDiscoverer(RuleManager<IN, OUT> ruleManager);
}

Add CEP.rule() method

CEP adds the rule() method to apply the dynamic rules onto the input DataStream and create new DataStream containing the processed results of matched CEP rules.

Code Block
public class CEP {
    /**
     * Creates a {@link DataStream} containing the processed results of matched CEP rules.
     *
     * @param input DataStream containing the input events
     * @param factory Rule discoverer factory to create the {@link RuleDiscoverer}
     * @param <IN> type of the elements appearing in the pattern
     * @param <OUT> type of produced elements based on found matches
     * @return Resulting data stream
     */
    public static <IN, OUT> DataStream<OUT> rule(
            DataStream<IN> input, RuleDiscovererFactory<IN, OUT> factory) {...}
}

Example Usage

This section provides code snippets that shows how users can use the APIs proposed above to change CEP rules dynamically.

...

The general process of the example described in this section and some general implementation idea is shown in the wireframe below. Operator Coordinator on JobManager will read rule updates from users' database and send rule updates to subtasks, and as a result the matched & processed results produced by subtasks are influenced.


Proposed Changes

The general structure that supports dynamic rules in a job graph is shown in the wireframe below. When an input event data comes from upstream operator, it goes through the following process to produce a matched & processed record.

...

In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the rules and thus need a RuleDiscoverer. Our design of collecting rules and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be rule managers inside each of their Operator Coordinators. The manager in the OperatorCoordinator serves to discover rule changes and convert the information into rules. Then the subtasks, after receiving the latest rules from Operator Coordinator, will make the update.


Supporting multiple & dynamic rules inside CEP operators

Currently CepOperator uses NFAState and NFA to deal with input data. As it has not supported multiple patterns yet, all input data are dealt with by the same NFAState.

In order to support multiple patterns in one operator, the one NFA needs to be replaced by a list or map of NFA, each corresponds to a Rule. When input data triggers the processElement() method, the data will be matched against all rules in the operator. Adding or deleting rules means adding or deleting entries in the list or map.

Failover

During checkpoints, the operators mentioned above will store serialized rules and partially matched results inside state backend and restore them during recovery. In this way rules or matches will not be lost during failover.

Common case behavior analysis

When a rule is deleted or replaced, partially matched results related to the rule is directly deprecated and will no longer match to incoming input data.

...

Given that the proposed design offers eventual consistency, there could be a period of time just after a rule update happens when inconsistency exists among subtasks of CepOperators, which means they do not work on the same set of rules. This inconsistency is caused by the fact that OperatorCoordinator cannot send OperatorEvent to subtasks simutaneously, and this period of time usually lasts for milliseconds. As can be illustrated in the graph below, fake alerts ("AC" instead of "ABC") might be generated during that period.

Compatibility, Deprecation, and Migration Plan

The changes proposed in this FLIP is backward compatible with existing APIs of Flink CEP. We propose to change the APIs directly without deprecation period.

Current CEP.pattern() methods still exist after changes of this FLIP is introduced, and static CEP patterns created by these methods can still preserve their function, performance and job structure.

Test Plan

We will use unit tests to cover the correctness of introduced features.

Future Work

Service Degradation

As all CEP patterns can be placed in the same operator of the same Flink job in the features we proposed, the error of a single pattern could cause the failure of the whole Flink job, thus affecting the availability of other CEP patterns.

...

We plan to expose this feature as a configurable property. The default behavior is still causing the whole Flink job to fail.

Table/SQL API Support

Currently we only propose the design for multiple/dynamic rules in DataStream API, but our design can also be extended into Table/SQL API. A general idea is that UDFs can be defined for CEP.rule() and RuleDiscovererFactory implementations, and users will be able to use them in SQL like this:

...