Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: replace rule with pattern processor

...

We propose to make the following API changes to support dynamic pattern changing in CEP.

Add

...

PatternProcessorInterface

The term "rulePatternProcessor" defines a certain pattern, how to match the pattern, and how to process the found matches. A rule pattern processor extends beyond a pattern to also include all the other relevant functions. A brief illustration to their relationship is as below.

The starting point of introducing "RulePatternProcessor" is trying to deal with situations like follows:

...

Simply having multiple patterns might not be able to deal with situations like above, and the concept Rule PatternProcessor   can handle them properly.

...

We propose to add PatternProcessor interface as below. The properties of the rule pattern processor are required by CEP operator in order to function correctly.

Code Block
/**
 * Base class for a pattern processor definition.
 *
 * <p>A pattern processor defines a {@link Pattern}, how to match the pattern, and how to process
 * the found matches.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface PatternProcessor<IN, OUT> extends Serializable, Versioned {

    /**
     * Returns the ID of the pattern processor.
     *
     * @return The ID of the pattern processor.
     */
    String getId();

    /**
     * Returns the scheduled time at which the pattern processor should come into effective.
     *
     * <p>If the scheduled time is earlier than current event/processing time, the pattern processor
     * will immediately become effective.
     *
     * <p>If the pattern processor should always become effective immediately, the scheduled time
     * can be set to {@code Long.MIN_VALUE}: {@value Long#MIN_VALUE}.
     *
     * @return The scheduled time.
     */
    default Long getTimestamp() {
        return Long.MIN_VALUE;
    }

    /**
     * Returns the {@link Pattern} to be matched.
     *
     * @return The pattern of the pattern processor.
     */
    Pattern<IN, ?> getPattern();

    /**
     * Returns the {@link KeySelector} to extract the key of the elements appearing in the pattern.
     *
     * <p>Only data with the same key can be chained to match a pattern. If extracted key is null,
     * the behavior is to reuse current key if is {@link KeyedStream}, or allocate the same key for
     * all input data if is not {@link KeyedStream}.
     *
     * @return The key selector of the pattern processor.
     */
    @Nullable
    KeySelector<IN, ?> getKeySelector();

    /**
     * Get the {@link PatternProcessFunction} to process the found matches for the pattern.
     *
     * @return The pattern process function of the pattern processor.
     */
    PatternProcessFunction<IN, OUT> getPatternProcessFunction();
}

Given the needs of data exchange and state storage, the RulePatternProcessor must be serializable, which requires that the classes involved in the RulePatternProcessor must also be serializable, for example, make Pattern implement Serializable.

Add

...

PatternProcessorManager Interface

We propose to introduce the RuleManagerPatternProcessor Manager interface. The manager handles the rule pattern processor updates and provides information about current active rulespattern processors.

Its possible implementation includes components in Flink job that need to receive rule pattern processor update notifications, like CepOperator or its OperatorCoordinator. This means that this interface will only have internal implementations. Users only need to use this interface, not to inherit or create new instance of child classes of this interface.

Code Block
/**
 * This manager handles updated pattern rulesprocessors and manages current rulespattern processors.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface RuleManager<INPatternProcessorManager<IN, OUT> {
    /**
     * DealsDeal with the notification that rulespattern processors are updated.
     *
     * @param rulespatternProcessors A list of all latest pattern rulesprocessors.
     */
    void onRulesUpdatedonPatternProcessorsUpdated(List<Rule<INList<PatternProcessor<IN, OUT>> rulespatternProcessors);

    /**
     * Returns the current pattern rulesprocessors managed.
     *
     * @return The current rulespattern processors.
     */
    List<Rule<INList<PatternProcessor<IN, OUT>> getCurrentRulesgetCurrentPatternProcessors();
}

Add

...

PatternProcessorDiscoverer Interface

We introduce the RuleDiscovererPatternProcessorDiscoverer interface, which discovers the rule pattern processor changes and notifies the RuleManagerPatternProcessorManager of the collected rule pattern processor updates. This interface also serves to provide initial rulespattern processors.

Code Block
/**
 * Interface that discovers pattern ruleprocessor changesupdates, notifies {@link RuleManagerPatternProcessorManager} of rule
 * pattern processor updates and provides
 * the initial pattern rulesprocessors.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface RuleDiscoverer<INPatternProcessorDiscoverer<IN, OUT> extends Closeable {
    /**
     * Discover the pattern ruleprocessor changesupdates.
     *
     * <p>In dynamic pattern ruleprocessor changingupdate case, this function should be a continuous process to
 check  rule
  * check pattern *processor updates and notify the {@link RuleManagerPatternProcessorManager}.
     */
    void discoverRuleChangesdiscoverPatternProcessorUpdates(RuleManager<INPatternProcessorManager<IN, OUT> manager) throws Exception;

    /**
     * Returns the rulespattern processors an operator should be set up with.
     *
     * @return The initial pattern rulesprocessors.
     */
    List<Rule<INList<PatternProcessor<IN, OUT>> getInitialRulesgetInitialPatternProcessors();
}

Users should be responsible for creating child classes for this interface, as the possible ways to collect rules pattern processors are unlimited. Still, we can provide several abstract class that can handle most use cases, like the following AbstractPeriodicRuleDiscovererAbstractPeriodicPatternProcessorDiscoverer that periodically checks for updates.

Code Block
/** Class that periodically checks for rulepattern processor updates. */
public abstract class AbstractPeriodicRuleDiscoverer<INAbstractPeriodicPatternProcessorDiscoverer<IN, OUT>
        implements RuleDiscoverer<INPatternProcessorDiscoverer<IN, OUT> {
    private final List<Rule<INList<PatternProcessor<IN, OUT>> initRulesinitPatternProcessors;
    private final long intervalMillis;

    /**
     * Constructor.
     *
     * @param intervalMillis Time interval in milliseconds at which to check updates.
     */
    public AbstractPeriodicRuleDiscoverer(List<Rule<INAbstractPeriodicPatternProcessorDiscoverer(
            List<PatternProcessor<IN, OUT>> initRulesinitPatternProcessors, long intervalMillis) {
        this.initRulesinitPatternProcessors = initRulesinitPatternProcessors;
        this.intervalMillis = intervalMillis;
    }

    /** Checks whether there is rulepattern processor update. */
    public abstract boolean checkRuleUpdatecheckPatternProcessorUpdate();

    /** Returns the latest rulespattern processors. */
    public abstract List<Rule<INList<PatternProcessor<IN, OUT>> getLatestRulesgetLatestPatternProcessors();

    @Override
    public void discoverRuleChanges(RuleManager<INdiscoverPatternProcessorUpdates(
            PatternProcessorManager<IN, OUT> ruleManagerpatternProcessorManager) {
        while (true) {
            if (checkRuleUpdatecheckPatternProcessorUpdate()) {
                List<Rule<INList<PatternProcessor<IN, OUT>> rulespatternProcessors = getLatestRulesgetLatestPatternProcessors();
                ruleManagerpatternProcessorManager.onRulesUpdatedonPatternProcessorsUpdated(rulespatternProcessors);
            }
            try {
                Thread.sleep(intervalMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }

    @Override
    public List<Rule<INList<PatternProcessor<IN, OUT>> getInitialRulesgetInitialPatternProcessors() {
        return initRulesinitPatternProcessors;
    }
}

Add

...

PatternProcessorDiscovererFactory Interface

The RuleDiscovererFactoryPatternProcessorDiscovererFactory is introduced to create the RuleDiscovererPatternProcessorDiscoverer with the RuleManagerPatternProcessorManager, that requires RuleDiscovererPatternProcessorDiscoverer to notify RuleManagerPatternProcessorManager to deal with the rule pattern processor changes.

Code Block
/**
 * A factory for {@link RuleDiscovererPatternProcessorDiscoverer}. The created {@link RuleDiscovererPatternProcessorDiscoverer}
 * should notify {@link
 *PatternProcessorManager} RuleManager} to deal with the rulepattern processor changesupdates.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
public interface RuleDiscovererFactory<INPatternProcessorDiscovererFactory<IN, OUT> extends Serializable {
    /**
     * Creates a {@link RuleDiscovererPatternProcessorDiscoverer}.
     *
     * @return A {@link RuleDiscovererPatternProcessorDiscoverer} instance.
     */
    RuleDiscoverer<INPatternProcessorDiscoverer<IN, OUT> createRuleDiscoverercreatePatternProcessorDiscoverer() throws Exception;
}

Add CEP.

...

patternProcessor() method

CEP adds the rulepatternProcessor() method to apply the dynamic rules pattern processors onto the input DataStream and create new DataStream containing the processed results of matched CEP rulespattern processors.

Code Block
public class CEP {
    /**
     * Creates a {@link DataStream} containing the processed results of matched CEP rules pattern
     * processors.
     *
     * @param input DataStream containing the input events
     * @param factory Pattern Ruleprocessor discoverer factory to create the {@link RuleDiscoverer
     *     PatternProcessorDiscoverer}
     * @param <IN> typeType of the elements appearing in the pattern
     * @param <OUT> typeType of produced elements based on found matches
     * @return Resulting data stream
     */
    public static <IN, OUT> DataStream<OUT> rulepatternProcessor(
            DataStream<IN> input, RuleDiscovererFactory<INPatternProcessorDiscovererFactory<IN, OUT> factory) {...}
}

Example Usage

This section provides code snippets that shows how users can use the APIs proposed above to change CEP rules dynamicallypattern processors dynamically.

Suppose the CEP library provided an implementation of RuleDiscovererPatternProcessorDiscoverer, called PeriodDBRuleDiscovererPatternProcessorDiscoverer, that periodically checks a database for rule pattern processor updates and initializes rules pattern processors from a fixed rule pattern processor set according to the json.

Code Block
/**
 * Implementation of {@link RuleDiscovererPatternProcessorDiscoverer} that periodically checks a database for pattern ruleprocessor updates and
 * initializes the {@link RulePatternProcessor} from a fixed rulepattern processor set according to information retrieved from the database.
 */
public class PeriodDBRuleDiscoverer<INPeriodDBPatternProcessorDiscoverer<IN, OUT> implements RuleDiscoverer<INPatternProcessorDiscoverer<IN, OUT> {

    @Override
    public void discoverRuleChangesdiscoverPatternProcessorChanges() {...}

    @Override
    public List<Rule<INList<PatternProcessor<IN, OUT>> getInitialRulesgetInitialPatternProcessors() {...}
}

To start with, users need to provide classes that implement RulePatternProcessor interface. Suppose some class called DummyRuleDummyPatternProcessor is created like below.

Code Block
public class DummyRuleDummyPatternProcessor implements Rule<EventPatternProcessor<Event, Object> {
    
    public DummyRuleDummyPatternProcessor(int a) {...}

    @Override
    public String getId() {...}
    
    @Override
    public int getVersion() {...}

    @Override
    public Pattern<Event, ?> getPattern() {...}

    @Nullable
    @Override
    public KeySelector<Event, ?> getKeySelector() {...}

    @Override
    public PatternProcessFunction<Event, Object> getPatternProcessFunction() {...}
}

In order to use PeriodDBRuleDiscovererPeriodDBPatternProcessorDiscoverer, users need to create an instance of PeriodDBRuleDiscovererPeriodDBPatternProcessorDiscovererFactory.

Code Block
PeriodDBRuleDiscovererFactory<EventPeriodDBPatternProcessorDiscovererFactory<Event, Object> factory = new PeriodDBRuleDiscovererFactoryPeriodDBPatternProcessorDiscovererFactory();

Then users create DataStream with input data stream and this factory.

Code Block
DataStream<Event> inputStream = env.fromElements(...);

DataStream<Object> outputStream = CEP.rulepatternProcessor(inputStream, factory);

After Flink job has been configured to support dynamic rule pattern processor and successfully submitted, users can update rules pattern processors by configuring rules pattern processors in their database, recording them in a table as follows.

uid

className

constructorParams

1

"org.apache.flink.cep.rulepatternprocessor.DummyRuleDummyPatternProcessor"

[{"type": "int", "value": 0}]

2

"org.apache.flink.cep.rulepatternprocessor.DummyRuleDummyPatternProcessor"

[{"type": "int", "value": 1}]

Information like rule pattern processor update or exception will be recorded in Flink's log system. Users can check the log to see, for example, whether a rule pattern processor update succeeded or not.

The general process of the example described in this section and some general implementation idea is shown in the wireframe below. Operator Coordinator on JobManager will read rule pattern processor updates from users' database and send rule pattern processor updates to subtasks, and as a result the matched & processed results produced by subtasks are influenced.

...

The general structure that supports dynamic rules pattern processors in a job graph is shown in the wireframe below. When an input event data comes from upstream operator, it goes through the following process to produce a matched & processed record.

  1. Key Generating Operator computes the key of the input data according to the KeySelector provided by RulePatternProcessors, and attach the key along with the input data. If there are N rules pattern processors, then there will be N records with the same input data and different keys sent downstream.
  2. Input data is sent to different downstream operator subtasks by calling keyBy() operation on them. keyBy() uses the key generated in the previous step.
  3. The next operator matches the input data to Patterns provided by RulePatternProcessors, and when a matched sequence is generated, the operator calls the corresponding PatternProcessFunction on that sequence and produce the result to downstream.

...

In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the rules pattern processors and thus need a RuleDiscovererPatternProcessorDiscoverer. Our design of collecting rules pattern processors and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be rule pattern processor managers inside each of their Operator Coordinators. The manager in the OperatorCoordinator serves to discover rule pattern processor changes and convert the information into rulespattern processors. Then the subtasks, after receiving the latest rules pattern processors from Operator Coordinator, will make the update.


Supporting multiple & dynamic

...

pattern processors inside CEP operators

Currently CepOperator uses NFAState and NFA to deal with input data. As it has not supported multiple patterns yet, all input data are dealt with by the same NFAState.

In order to support multiple patterns in one operator, the one NFA needs to be replaced by a list or map of NFA, each corresponds to a RulePatternProcessor. When input data triggers the processElement() method, the data will be matched against all rules pattern processors in the operator. Adding or deleting rules pattern processors means adding or deleting entries in the list or map.

Scheduling

...

pattern processors

In order to make the behavior of rule pattern processor updates more deterministic, users can schedule the time at which an updated rule pattern processor should come into effect, which is reflected by the getTimestamp() method in Rule PatternProcessor .

When a new set of rules pattern processors is provided by RuleDiscoverer PatternProcessorDiscoverer , they will first be cached in operators. When a watermark arrived at the operator, it will check the timestamp of the watermark against the scheduled time of cached rules pattern processors and update rules pattern processors accordingly.

Failover

During checkpoints, the operators mentioned above will store serialized rules pattern processors and partially matched results inside state backend and restore them during recovery. In this way rules pattern processors or matches will not be lost during failover.

Common case behavior analysis

When a rule pattern processor is deleted or replaced, partially matched results related to the rule pattern processor is directly deprecated and will no longer match to incoming input data.

If there is no rule pattern processor in a CepOperator, all input data will be deprecated as they cannot match to any of the existing patterns.

If IDs of the List<Rule> List<PatternProcessor>  provided by RuleDiscoverer PatternProcessorDiscoverer  duplicate with existing rulespattern processors, versions of the Rule PatternProcessor s will be checked. existing rules pattern processors will be preserved if the versions of the two are the same, or it will be replaced by the newer one otherwise.

Given that the proposed design offers eventual consistency, there could be a period of time just after a rule pattern processor update happens when inconsistency exists among subtasks of CepOperators, which means they do not work on the same set of rulespattern processors. This inconsistency is caused by the fact that OperatorCoordinator cannot send OperatorEvent to subtasks simutaneouslysimultaneously, and this period of time usually lasts for milliseconds. As can be illustrated in the graph below, fake alerts ("AC" instead of "ABC") might be generated during that period.

...

In order to solve this problem, we plan to make CepOperators able to handle exceptions within a rulePatternProcessor's scope. When the matching or processing function of a certain rule pattern processor throws exceptions, the operator should catch this exception, convert it into error messages and sent to side output to call for external solution. The rule pattern processor that threw the exception will be temporarily disabled on that subtask while other rules pattern processors work unaffected.

We plan to expose this feature as a configurable property. The default behavior is still causing the whole Flink job to fail.

...

Currently we only propose the design for multiple/dynamic rules pattern processors in DataStream API, but our design can also be extended into Table/SQL API.

In order to achieve the support, we need to add a keyword in SQL like follows, similar to the existing MATCH_RECOGNIZE keyword.

Code Block
SELECT * FROM input_table RULE(
	`rule_discoverer_factory` T.aid, T.bid, T.cid
FROM MyTable
    MATCH_RECOGNIZE_DYNAMIC (
      pattern_processor_discoverer_factory = custom_rulepattern_processor_discoverer_factory
    ) AS T

While MATCH_RECOGNIZE is standard SQL released by the International Organization for Standardization (ISO), we have not found any standard expression format that supports multiple & dynamic match in SQL. Thus the example above is still a naive idea from us and might need further refinementjust a simple example showing how the API might look like. Further refinement of the API is required when it comes to detailed design.

As for implementations, we looked into MATCH_RECOGNIZE's implementation and found that it will finally create a CepOperator , which means it can share large proportion of code with the implementation of DataStream API. As illustrated above, the implementation in our proposal will just be adding common StreamOperators and utilizing the existing OperatorCoordinator mechanism. Therefore we believe when we need to support Table/SQL API, most work would be common and have been done in DataStream's implementationimplementation, Flink's infrastructure for Table/SQL can recognize the keyword above and create corresponding StreamOperators. The operators can accept the PatternProcessorDiscovererFactory and pass it to the existing implementations proposed in this FLIP, which means Table/SQL and DataStream implementation of this functionality can share a lot in common.