Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The term "PatternProcessor" defines a certain pattern, how to match the pattern, and how to process the found matches. A pattern processor extends beyond a pattern to also include all the other relevant functions. A brief illustration to their relationship is as belowAs shown in the wireframe below, Process 1 and Process 2 are equivalent process.

The starting point of introducing "PatternProcessor" is trying to deal with situations like follows:

...

Simply having multiple patterns might not be able to deal with situations like above, and the concept PatternProcessor   can  can handle them properly.

Image RemovedImage Added


We propose to add PatternProcessor interface as below. The properties of the pattern processor are required by CEP operator in order to function correctly.

...

Code Block
/**
 * A factory for {@link PatternProcessorDiscoverer}. The created {@link PatternProcessorDiscoverer}
 * should notify {@link PatternProcessorManager} to deal with the pattern processor updates.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
public interface PatternProcessorDiscovererFactory<IN, OUT> extends Serializable {
    /**
     * Creates a {@link PatternProcessorDiscoverer}.
     *
     * @return A {@link PatternProcessorDiscoverer} instance.
     */
    PatternProcessorDiscoverer<IN, OUT> createPatternProcessorDiscoverer() throws Exception;
}

Add CEP.

...

patternProcess() method

CEP adds the patternProcessorpatternProcess() method to apply the dynamic pattern processors onto the input DataStream and create new DataStream containing the processed results of matched CEP pattern processors.

Code Block
public class CEP {
    /**
     * Creates a {@link DataStream} containing the processed results of matched CEP pattern
     * processors.
     *
     * @param input DataStream containing the input events
     * @param factory Pattern processor discoverer factory to create the {@link
     *     PatternProcessorDiscoverer}
     * @param <IN> Type of the elements appearing in the pattern
     * @param <OUT> Type of produced elements based on found matches
     * @return Resulting data stream
     */
    public static <IN, OUT> DataStream<OUT> patternProcessorpatternProcess(
            DataStream<IN> input, PatternProcessorDiscovererFactory<IN, OUT> factory) {...}
}

...

Code Block
/**
 * Implementation of {@link PatternProcessorDiscoverer} that periodically checks a database for pattern processor updates and
 * initializes the {@link PatternProcessor} from a fixed pattern processor set according to information retrieved from the database.
 */
public class PeriodDBPatternProcessorDiscoverer<IN, OUT> implements PatternProcessorDiscoverer<IN, OUT> {

    @Override
    public void discoverPatternProcessorChanges() {...}

    @Override
    public List<PatternProcessor<IN, OUT>> getInitialPatternProcessors() {...}
}

...

Code Block
DataStream<Event> inputStream = env.fromElements(...);

DataStream<Object> outputStream = CEP.patternProcessorpatternProcess(inputStream, factory);

After Flink job has been configured to support dynamic pattern processor and successfully submitted, users can update pattern processors by configuring pattern processors in their database, recording them in a table as follows.

uid

className

constructorParams

1

"org.apache.flink.cep.patternprocessorprocessor.DummyPatternProcessor"

[{"type": "int", "value": 0}]

2

"org.apache.flink.cep.patternprocessorprocessor.DummyPatternProcessor"

[{"type": "int", "value": 1}]

...