Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: scheduled time, relation of rule&pattern, further description for Table/SQL

...

In situations like Risk Controlling or Pattern Matching, it is common that users want to change the pattern that events need to match to while the pattern is still functioning to provide service. In current Flink CEP, a CEP operator has a fixed CEP pattern and does not support changing it. Thus in order to achieve the goal described above, users have to restart flink jobs and wait relatively long update time.

...

In this FLIP, we propose to add a couple of APIs and classes to Flink CEP in order to support having multiple patterns in one operator and updating patterns dynamically without stoppling stopping Flink jobs.

Public Interfaces

...

The term "rule" defines a certain pattern, how to match the pattern, and how to process the found matches. The properties of the rule are required by CEP operator in order to function correctly.A rule extends beyond a pattern to also include all the other relevant functions. A brief illustration to their relationship is as below.

The starting point of introducing "Rule" is trying to deal with situations like follows:

  • While one pattern requires input data to be grouped by user_id , another pattern might require data to be grouped by item_id .
  • Users might want to distinguish results that match different patterns and deal with them differently.

Simply having multiple patterns might not be able to deal with situations like above, and the concept Rule  can handle them properly.

Image Added


We propose to add Rule interface as below. The properties of the rule are required by CEP operator in order to function correctly.

Code Block
/**
 * Base class for a rule definition.
 *
 * <p>A rule defines a {@link Pattern}, how to match the pattern, and how to process the found
 * matches.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface Rule<IN, OUT> extends Serializable, Versioned {
    
Code Block
/**
 * Base class for a rule definition.
 *
 * <p>A rule defines a {@link Pattern}, how to match the pattern, and how to process the found
 * matches.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface Rule<IN, OUT> extends Serializable, Versioned {
    /**
     * Returns the ID of the rule.
     *
     * @return The ID of the rule.
     */
    String getId();

    /**
     * Returns the {@linkID Pattern}of tothe be matchedrule.
     *
     * @return The patternID of the rule.
     */
    Pattern<IN, ?> getPatternString getId();

    /**
     * Returns the {@linkscheduled KeySelector}time toat extractwhich the keyrule ofshould thecome elements appearing in the patterninto effective.
     *
     * <p>Only<p>If data with the samescheduled keytime canis beearlier chainedthan tocurrent match a pattern. If extracted key is null,event/processing time, the rule will
     * the behavior is to reuse current key if is {@link KeyedStream}, or allocate the same key for
     * all input data if is not {@link KeyedStreamimmediately become effective.
     *
     * <p>If the rule should always become effective immediately, the scheduled time can be set to
     * {@code Long.MIN_VALUE}: {@value Long#MIN_VALUE}.
     *
     * @return The key selector of the rulescheduled time.
     */
    @Nullable
default Long getScheduledTime() {
      KeySelector<IN, ?> getKeySelector();
 return Long.MIN_VALUE;
    }

    /**
     * GetReturns the {@link PatternProcessFunctionPattern} to process the found matches for the patternbe matched.
     *
     * @return The pattern process function of the rule.
     */
    PatternProcessFunction<INPattern<IN, OUT>?> getPatternProcessFunctiongetPattern();
}

Given the needs of data exchange and state storage, the Rule must be serializable, which requires that the classes involved in the Rule must also be serializable, for example, make Pattern implement Serializable.

Add RuleManager Interface

We propose to introduce the RuleManager interface. The manager handles the rule updates and provides information about current active rules.

Its possible implementation includes components in Flink job that need to receive rule update notifications, like CepOperator or its OperatorCoordinator. This means that this interface will only have internal implementations. Users only need to use this interface, not to inherit or create new instance of child classes of this interface.

Code Block
/**
 * This manager handles updated rules and manages current rules.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface RuleManager<IN, OUT> {
    /**
     * Returns the {@link KeySelector} to extract the key of the elements appearing in the pattern.
     *
     * <p>Only data with the same key can be chained to match a pattern. If extracted key is null,
     * the behavior is to reuse current key if is {@link KeyedStream}, or allocate the same key for
     * all input data if is not {@link KeyedStream}.
     *
     * @return The key selector of the rule.
     */
    @Nullable
    KeySelector<IN, ?> getKeySelector();

    /**
     * Deals with Get the {@link PatternProcessFunction} to process the notificationfound thatmatches rulesfor arethe updatedpattern.
     *
     * @param@return rulesThe pattern Aprocess listfunction of allthe latest rulesrule.
     */
    void onRulesUpdated(List<Rule<INPatternProcessFunction<IN, OUT>>OUT> rulesgetPatternProcessFunction();

    /**
     * Returns the current rules managed.
     *
     * @return The current rules.
     */
    List<Rule<IN, OUT>> getCurrentRules();
}

Add RuleDiscoverer Interface

We introduce the RuleDiscoverer interface, which discovers the rule changes and notifies the RuleManager of the collected rule updates. This interface also serves to provide initial rules.

}

Given the needs of data exchange and state storage, the Rule must be serializable, which requires that the classes involved in the Rule must also be serializable, for example, make Pattern implement Serializable.

Add RuleManager Interface

We propose to introduce the RuleManager interface. The manager handles the rule updates and provides information about current active rules.

Its possible implementation includes components in Flink job that need to receive rule update notifications, like CepOperator or its OperatorCoordinator. This means that this interface will only have internal implementations. Users only need to use this interface, not to inherit or create new instance of child classes of this interface.

Code Block
/**
 * This manager handles updated rules and manages current
Code Block
/**
 * Interface that discovers rule changes, notifies {@link RuleManager} of rule updates and provides
 * the initial rules.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface RuleDiscoverer<INRuleManager<IN, OUT> {
    /**
     * DiscoverDeals with the rule changesnotification that rules are updated.
     *
     * <p>In@param dynamicrules ruleA changinglist case,of thisall function should be a continuous process to check rule
     * updates and notify rule managerlatest rules.
     */
    void discoverRuleChanges(onRulesUpdated(List<Rule<IN, OUT>> rules);

    /**
     * Returns the current rules anmanaged.
 operator should be set up with.
     *
     * @return The initialcurrent rules.
     */
    List<Rule<IN, OUT>> getInitialRulesgetCurrentRules();
}

Users should be responsible for creating child classes for this interface, as the possible ways to collect rules are unlimited. Still, we can provide several abstract class that can handle most use cases, like the following AbstractPeriodicRuleDiscoverer that periodically checks for updates.

Add RuleDiscoverer Interface

We introduce the RuleDiscoverer interface, which discovers the rule changes and notifies the RuleManager of the collected rule updates. This interface also serves to provide initial rules.

Code Block
/*
Code Block
/**
 * ClassInterface that periodically checks for discovers rule changes, notifies {@link RuleManager} of rule updates and provides
 * the initial rules.
 */
public abstract* class@param AbstractPeriodicRuleDiscoverer<IN,<IN> OUT>Base implementstype RuleDiscoverer<IN,of OUT>the {
elements appearing in the privatepattern.
 final* List<Rule<IN,@param OUT>><OUT> initRules;
Type of produced elements privatebased finalon longfound intervalMillis;matches.
 */
@PublicEvolving
public   private final RuleManager<INinterface RuleDiscoverer<IN, OUT> ruleManager;{

    /**
     * ConstructorDiscover the rule changes.
     * 
     * @param<p>In intervalMillisdynamic Timerule intervalchanging incase, millisecondsthis atfunction whichshould tobe checka updates.
continuous process to check  */rule
    public AbstractPeriodicRuleDiscoverer(
* updates and notify the {@link RuleManager}.
     */
 List<Rule<IN, OUT>> initRules, long intervalMillis, void discoverRuleChanges(RuleManager<IN, OUT> ruleManagermanager);

 {
   /**
     this.initRules* =Returns initRules;
the rules an operator should be set up this.intervalMillis = intervalMillis;with.
     *
   this.ruleManager = ruleManager;
* @return The initial }

    /** Checks whether there is rule update. rules.
     */
    publicList<Rule<IN, abstractOUT>> boolean checkRuleUpdategetInitialRules();
}

Users should be responsible for creating child classes for this interface, as the possible ways to collect rules are unlimited. Still, we can provide several abstract class that can handle most use cases, like the following AbstractPeriodicRuleDiscoverer that periodically checks for updates.

Code Block
/** Class that periodically checks for rule updates
    /** Returns the latest rules. */
public abstract class AbstractPeriodicRuleDiscoverer<IN, publicOUT> abstractimplements List<Rule<INRuleDiscoverer<IN, OUT>> getLatestRules();
OUT> {
    @Override
private final List<Rule<IN, OUT>> publicinitRules;
 void discoverRuleChanges() {
 private final long intervalMillis;

    while (true) {/**
     * Constructor.
     *
 if (checkRuleUpdate()) {
  * @param intervalMillis Time interval in milliseconds at which to check updates.
   List<Rule<IN, OUT>> rules*/
 = getLatestRules();
  public AbstractPeriodicRuleDiscoverer(List<Rule<IN, OUT>> initRules, long intervalMillis) {
        ruleManager.onRulesUpdated(rules)this.initRules = initRules;
        this.intervalMillis = intervalMillis;
    }

    /** Checks whether there is rule update. */
  try {  public abstract boolean checkRuleUpdate();

    /** Returns the latest rules. */
    public abstract  Thread.sleep(intervalMillisList<Rule<IN, OUT>> getLatestRules();

    @Override
    public void discoverRuleChanges(RuleManager<IN,  } catch (InterruptedException eOUT> ruleManager) {
                e.printStackTrace();while (true) {
            if (checkRuleUpdate()) {
     break;
           List<Rule<IN, OUT>> rules = getLatestRules();
              }  ruleManager.onRulesUpdated(rules);
        }
    }

     @Override
    public List<Rule<IN, OUT>> getInitialRules()try {
         return   initRules;
    }
}

Add RuleDiscovererFactory Interface

The RuleDiscovererFactory is introduced to create the RuleDiscoverer with the RuleManager, that requires RuleDiscoverer to notify RuleManager to deal with the rule changes.

Code Block
/**
 * A factory for {@link RuleDiscoverer}. The created {@link RuleDiscoverer} should notify {@link
 * RuleManager} to deal with the rule changes.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
public interface RuleDiscovererFactory<IN, OUT> extends Serializable {
    /**Thread.sleep(intervalMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
     * Creates a {@link RuleDiscoverer}.
     *}

     * @param ruleManager The rule manager notified.@Override
    public *List<Rule<IN, @returnOUT>> AgetInitialRules() {@link RuleDiscoverer} instance.
     */
   return RuleDiscoverer<IN,initRules;
 OUT> createRuleDiscoverer(RuleManager<IN, OUT> ruleManager);}
}

Add

...

RuleDiscovererFactory Interface

The RuleDiscovererFactory is introduced to create the RuleDiscoverer with the RuleManager, that requires RuleDiscoverer to notify RuleManager to deal with the rule changes.

Code Block

...

CEP adds the rule() method to apply the dynamic rules onto the input DataStream and create new DataStream containing the processed results of matched CEP rules.

Code Block
public class CEP {
    /**
 * A factory  * Creates afor {@link RuleDiscoverer}. The created {@link DataStreamRuleDiscoverer} containingshould thenotify processed{@link
 results* ofRuleManager} matchedto CEPdeal rules.
with the rule changes.
  *
     * @param input<IN> Base DataStreamtype containingof the inputelements events
appearing in the pattern.
  * @param factory<OUT> RuleType discovererof factoryproduced toelements createbased theon {@link RuleDiscoverer}found matches.
  */
public interface RuleDiscovererFactory<IN, *OUT> @paramextends <IN>Serializable type{
 of the elements appearing in the pattern /**
     * @paramCreates <OUT>a type of produced elements based on found matches{@link RuleDiscoverer}.
     *
     * @return Resulting data streamA {@link RuleDiscoverer} instance.
     */
    public static <INRuleDiscoverer<IN, OUT> DataStream<OUT> rulecreateRuleDiscoverer();
}

Add CEP.rule() method

CEP adds the rule() method to apply the dynamic rules onto the input DataStream and create new DataStream containing the processed results of matched CEP rules.

Code Block
public class CEP {
    /**
     * DataStream<IN>Creates input, RuleDiscovererFactory<IN, OUT> factory) {...}
}

Example Usage

This section provides code snippets that shows how users can use the APIs proposed above to change CEP rules dynamically.

Suppose the CEP library provided an implementation of RuleDiscoverer, called PeriodDBRuleDiscoverer, that periodically checks a database for rule updates and initializes rules from a fixed rule set according to the json.

Code Block
/**
 * Implementation of {@link RuleDiscoverer} that periodically checks a database for rule updates and
 * initializes the {@link Rule} from a fixed rule set according to information retrieved from the database.
 */
public class PeriodDBRuleDiscoverer<IN, OUT> implements RuleDiscoverer<IN, OUT> {

    @Override
    public void discoverRuleChanges() {...}

    @Override
    public List<Rule<IN, OUT>> getInitialRules() {...}
}

To start with, users need to provide classes that implement Rule interface. Suppose some class called DummyRule is created like below.

a {@link DataStream} containing the processed results of matched CEP rules.
     *
     * @param input DataStream containing the input events
     * @param factory Rule discoverer factory to create the {@link RuleDiscoverer}
     * @param <IN> type of the elements appearing in the pattern
     * @param <OUT> type of produced elements based on found matches
     * @return Resulting data stream
     */
    public static <IN, OUT> DataStream<OUT> rule(
            DataStream<IN> input, RuleDiscovererFactory<IN, OUT> factory) {...}
}

Example Usage

This section provides code snippets that shows how users can use the APIs proposed above to change CEP rules dynamically.

Suppose the CEP library provided an implementation of RuleDiscoverer, called PeriodDBRuleDiscoverer, that periodically checks a database for rule updates and initializes rules from a fixed rule set according to the json.

Code Block
/**
 * Implementation of {@link RuleDiscoverer} that periodically checks a database for rule updates and
 * initializes the {@link Rule} from a fixed rule set according to information retrieved from the database.
 */
public class PeriodDBRuleDiscoverer<IN, OUT> implements RuleDiscoverer<IN, OUT> {

    @Override
    public void discoverRuleChanges
Code Block
public class DummyRule implements Rule<Event, Object> {
    
    public DummyRule(int a) {...}

    @Override
    public String getId() {...}
    
    @Override
    public int getVersion() {...}

    @Override
    public Pattern<Event, ?> getPattern() {...}

    @Nullable
    @Override
    public KeySelector<Event, ?> getKeySelector() {...}

    @Override
    public PatternProcessFunction<Event, Object> getPatternProcessFunction() {...}
}

It is recommended that users override the implementations' toString(), equals(), and hashCode() methods. Overriding toString() provides better readability when monitoring the rules through logs, and overriding equals() and hashCode() can help to avoid reloading existing rules during a rule update.

In order to use PeriodDBRuleDiscoverer, users need to create an instance of PeriodDBRuleDiscovererFactory.

Code Block
PeriodDBRuleDiscovererFactory<Event, Object> factory = new PeriodDBRuleDiscovererFactory();

Then users create DataStream with input data stream and this factory.

Code Block
DataStream<Event> inputStream = env.fromElements(...);

DataStream<Object> outputStream = CEP.rule(inputStream, factory);

After Flink job has been configured to support dynamic rule and successfully submitted, users can update rules by configuring rules in their database, recording them in a table as follows.

...

uid

...

className

...

constructorParams

...

1

...

"org.apache.flink.cep.rule.DummyRule"

...

[{"type": "int", "value": 0}]

...

2

...

"org.apache.flink.cep.rule.DummyRule"

...

[{"type": "int", "value": 1}]



    @Override
    public List<Rule<IN, OUT>> getInitialRules() {...}
}

To start with, users need to provide classes that implement Rule interface. Suppose some class called DummyRule is created like below.

Code Block
public class DummyRule implements Rule<Event, Object> {
    
    public DummyRule(int a) {...}

    @Override
    public String getId() {...}
    
    @Override
    public int getVersion() {...}

    @Override
    public Pattern<Event, ?> getPattern() {...}

    @Nullable
    @Override
    public KeySelector<Event, ?> getKeySelector() {...}

    @Override
    public PatternProcessFunction<Event, Object> getPatternProcessFunction() {...}
}

It is recommended that users override the implementations' toString(), equals(), and hashCode() methods. Overriding toString() provides better readability when monitoring the rules through logs, and overriding equals() and hashCode() can help to avoid reloading existing rules during a rule update.

In order to use PeriodDBRuleDiscoverer, users need to create an instance of PeriodDBRuleDiscovererFactory.

Code Block
PeriodDBRuleDiscovererFactory<Event, Object> factory = new PeriodDBRuleDiscovererFactory();

Then users create DataStream with input data stream and this factory.

Code Block
DataStream<Event> inputStream = env.fromElements(...);

DataStream<Object> outputStream = CEP.rule(inputStream, factory);

After Flink job has been configured to support dynamic rule and successfully submitted, users can update rules by configuring rules in their database, recording them in a table as follows.

uid

className

constructorParams

1

"org.apache.flink.cep.rule.DummyRule"

[{"type": "int", "value": 0}]

2

"org.apache.flink.cep.rule.DummyRule"

[{"type": "int", "value": 1}]

Information like rule update or exception will be recorded in Flink's log system. Users can check the log to see, for example, whether a rule update succeeded or not.

The general process of the example described in this section and some general implementation idea is shown in the wireframe below. Operator Coordinator on JobManager will read rule updates from users' database and send rule updates to subtasks, and as a result the matched & processed results produced by subtasks are influenced.

Image Added


Proposed Changes

The general structure that supports dynamic rules in a job graph is shown in the wireframe below. When an input event data comes from upstream operator, it goes through the following process to produce a matched & processed record.

  1. Key Generating Operator computes the key of the input data according to the KeySelector provided by Rules, and attach the key along with the input data. If there are N rules, then there will be N records with the same input data and different keys sent downstream.
  2. Input data is sent to different downstream operator subtasks by calling keyBy() operation on them. keyBy() uses the key generated in the previous step.
  3. The next operator matches the input data to Patterns provided by Rules, and when a matched sequence is generated, the operator calls the corresponding PatternProcessFunction on that sequence and produce the result to downstream.

Image Added



In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the rules and thus need a RuleDiscoverer. Our design of collecting rules and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be rule managers inside each of their Operator Coordinators. The manager in the OperatorCoordinator serves to discover rule changes and convert the information into rules. Then the subtasks, after receiving the latest rules from Operator Coordinator, will make the update.

Image Added


Supporting multiple & dynamic rules inside CEP operators

Currently CepOperator uses NFAState and NFA to deal with input data. As it has not supported multiple patterns yet, all input data are dealt with by the same NFAState.

In order to support multiple patterns in one operator, the one NFA needs to be replaced by a list or map of NFA, each corresponds to a Rule. When input data triggers the processElement() method, the data will be matched against all rules in the operator. Adding or deleting rules means adding or deleting entries in the list or map.

Scheduling rules

In order to make the behavior of rule updates more deterministic, users can schedule the time at which an updated rule should come into effect, which is reflected by the getScheduledTime() method in Rule .

When a new set of rules is provided by RuleDiscoverer , they will first be cached in operators. When a watermark arrived at the operator, it will check the timestamp of the watermark against the scheduled time of cached rules and update rules accordingly

Information like rule update or exception will be recorded in Flink's log system. Users can check the log to see, for example, whether a rule update succeeded or not.

The general process of the example described in this section and some general implementation idea is shown in the wireframe below. Operator Coordinator on JobManager will read rule updates from users' database and send rule updates to subtasks, and as a result the matched & processed results produced by subtasks are influenced.

Image Removed

Proposed Changes

The general structure that supports dynamic rules in a job graph is shown in the wireframe below. When an input event data comes from upstream operator, it goes through the following process to produce a matched & processed record.

  1. Key Generating Operator computes the key of the input data according to the KeySelector provided by Rules, and attach the key along with the input data. If there are N rules, then there will be N records with the same input data and different keys sent downstream.
  2. Input data is sent to different downstream operator subtasks by calling keyBy() operation on them. keyBy() uses the key generated in the previous step.
  3. The next operator matches the input data to Patterns provided by Rules, and when a matched sequence is generated, the operator calls the corresponding PatternProcessFunction on that sequence and produce the result to downstream.

Image Removed

In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the rules and thus need a RuleDiscoverer. Our design of collecting rules and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be rule managers inside each of their Operator Coordinators. The manager in the OperatorCoordinator serves to discover rule changes and convert the information into rules. Then the subtasks, after receiving the latest rules from Operator Coordinator, will make the update.

Image Removed

Supporting multiple & dynamic rules inside CEP operators

Currently CepOperator uses NFAState and NFA to deal with input data. As it has not supported multiple patterns yet, all input data are dealt with by the same NFAState.

In order to support multiple patterns in one operator, the one NFA needs to be replaced by a list or map of NFA, each corresponds to a Rule. When input data triggers the processElement() method, the data will be matched against all rules in the operator. Adding or deleting rules means adding or deleting entries in the list or map.

Failover

During checkpoints, the operators mentioned above will store serialized rules and partially matched results inside state backend and restore them during recovery. In this way rules or matches will not be lost during failover.

...

As all CEP patterns can be placed in the same operator of the same Flink job in the features we proposed, the error of a single pattern could cause the failure of the whole Flink job, thus affecting the availability of other CEP patterns.

In order to solve this problem, we plan to make CepOperators able to handle exceptions within a rule's scope. When the matching or processing function of a certain rule throws exceptions, the operator should catch this exception, convert it into error messages and sent to side output to call for external solution. The rule that threw the exception will be temporarily disabled on that subtask while other rules work unaffected.

We plan to expose this feature as a configurable property. The default behavior is still causing the whole Flink job to fail.

Table/SQL API Support

Currently we only propose the design for multiple/dynamic rules in DataStream API, but our design can also be extended into Table/SQL API. A general idea is that UDFs can be defined for CEP.rule() and RuleDiscovererFactory implementations, and users will be able to use them in SQL like this:

...

in the features we proposed, the error of a single pattern could cause the failure of the whole Flink job, thus affecting the availability of other CEP patterns.

In order to solve this problem, we plan to make CepOperators able to handle exceptions within a rule's scope. When the matching or processing function of a certain rule throws exceptions, the operator should catch this exception, convert it into error messages and sent to side output to call for external solution. The rule that threw the exception will be temporarily disabled on that subtask while other rules work unaffected.

We plan to expose this feature as a configurable property. The default behavior is still causing the whole Flink job to fail.

Table/SQL API Support

Currently we only propose the design for multiple/dynamic rules in DataStream API, but our design can also be extended into Table/SQL API.

In order to achieve the support, we need to add a keyword in SQL like follows, similar to the existing MATCH_RECOGNIZE keyword.

Code Block
SELECT * FROM input_table RULE(
	`rule_discoverer_factory` = custom_rule_discoverer_factory
)

While MATCH_RECOGNIZE is standard SQL released by the International Organization for Standardization (ISO), we have not found any standard expression format that supports multiple & dynamic match in SQL. Thus the example above is still a naive idea from us and might need further refinement.

As for implementations, we looked into MATCH_RECOGNIZE's implementation and found that it will finally create a CepOperator , which means it can share large proportion of code with the implementation of DataStream API. As illustrated above, the implementation in our proposal will just be adding common StreamOperators and utilizing the existing OperatorCoordinator mechanism. Therefore we believe when we need to support Table/SQL API, most work would be common and have been done in DataStream's implementation.