Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

Vote thread
JIRA
Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We propose to make the following API changes to support dynamic pattern changing in CEP.

Add PatternProcessorInterfacePatternProcessor Interface

The term "PatternProcessor" defines a certain pattern, how to match the pattern, and how to process the found matches. A pattern processor extends beyond a pattern to also include all the other relevant functions. As shown in the wireframe below, Process 1 and Process 2 are equivalent process.

The starting point of introducing "PatternProcessor" is trying to deal with situations like follows:

...

Code Block
public class CEP {
    /**
     * Creates a {@link DataStream} containing the processed results of matched CEP pattern
     * processors.
     *
     * @param input DataStream containing the input events
     * @param factory Pattern processor discoverer factory to create the {@link
     *     PatternProcessorDiscoverer}
     * @param <IN> Type of the elements appearing in the pattern
     * @param <OUT> Type of produced elements based on found matches
     * @return Resulting data stream
     */
    public static <IN, OUT> DataStream<OUT> patternProcesspatternProcessors(
            DataStream<IN> input, PatternProcessorDiscovererFactory<IN, OUT> factory) {...}
}

...

Code Block
DataStream<Event> inputStream = env.fromElements(...);

DataStream<Object> outputStream = CEP.patternProcesspatternProcessors(inputStream, factory);

After Flink job has been configured to support dynamic pattern processor and successfully submitted, users can update pattern processors by configuring pattern processors in their database, recording them in a table as follows.

uid

className

constructorParams

1

"org.apache.flink.cep.processor.DummyPatternProcessor"

[{"type": "int", "value": 0}]

2

"org.apache.flink.cep.processor.DummyPatternProcessor"

[{"type": "int", "value": 1}]

Information like pattern processor update or exception will be recorded in Flink's log system. Users can check the log to see, for example, whether a pattern processor update succeeded or not.

...