Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

Vote thread
JIRA
Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We propose to make the following API changes to support dynamic pattern changing in CEP.

Add PatternProcessorInterfacePatternProcessor Interface

The term "PatternProcessor" defines a certain pattern, how to match the pattern, and how to process the found matches. A pattern processor extends beyond a pattern to also include all the other relevant functions. As shown in the wireframe below, Process 1 and Process 2 are equivalent process.

The starting point of introducing "PatternProcessor" is trying to deal with situations like follows:

  • While one pattern requires input data to be grouped by user_id , another pattern might require data to be grouped by item_id .

  • Users might want to distinguish results that match different patterns and deal with them differently.

Simply having multiple patterns might not be able to deal with situations like above, and the concept PatternProcessor can handle them properly.

...

Code Block
/**
 * Base class for a pattern processor definition.
 *
 * <p>A pattern processor defines a {@link Pattern}, how to match the pattern, and how to process
 * the found matches.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
@PublicEvolving
public interface PatternProcessor<IN, OUT> extends Serializable, Versioned {
    /**
     * Returns the ID of the pattern processor.
     *
     * @return The ID of the pattern processor.
     */
    String getId();

    /**
     * Returns the scheduled time at which the pattern processor should come into effective.
     *
     * <p>If the scheduled time is earlier than current event/processing time, the pattern processor
     * will immediately become effective.
     *
     * <p>If the pattern processor should always become effective immediately, the scheduled time
     * can be set to {@code Long.MIN_VALUE}: {@value Long#MIN_VALUE}.
     *
     * @return The scheduled time.
     */
    default Long getTimestamp() {
        return Long.MIN_VALUE;
    }

    /**
     * Returns the {@link Pattern} to be matched.
     *
     * @return The pattern of the pattern processor.
     */
    Pattern<IN, ?> getPattern();

    /**
     * Returns the {@link KeySelector} to extract the key of the elements appearing in the pattern.
     *
     * <p>Only data with the same key can be chained to match a pattern. If extracted key is null,
     * the behavior is to reuse current key if is {@link KeyedStream}, or allocate the same key for
     * all input data if is not {@link KeyedStream}.
     *
     * @return The key selector of the pattern processor.
     */
    @Nullable
    KeySelector<IN, ?> getKeySelector();

    /**
     * Get the {@link PatternProcessFunction} to process the found matches for the pattern.
     *
     * @return The pattern process function of the pattern processor.
     */
    PatternProcessFunction<IN, OUT> getPatternProcessFunction();
}

Given the needs of data exchange and state storage, the PatternProcessor must be serializable, which requires that the classes involved in the PatternProcessor must also be serializable, for example, make Pattern implement Serializable.

Add PatternProcessorManager Interface

...

Add PatternProcessorDiscovererFactory Interface

The PatternProcessorDiscovererFactory is introduced to create the PatternProcessorDiscoverer with the PatternProcessorManager, that requires PatternProcessorDiscoverer to notify PatternProcessorManager to deal with the pattern processor changes.

Code Block
/**
 * A factory for {@link PatternProcessorDiscoverer}. The created {@link PatternProcessorDiscoverer}
 * should notify {@link PatternProcessorManager} to deal with the pattern processor updates.
 *
 * @param <IN> Base type of the elements appearing in the pattern.
 * @param <OUT> Type of produced elements based on found matches.
 */
public interface PatternProcessorDiscovererFactory<IN, OUT> extends Serializable {
    /**
     * Creates a {@link PatternProcessorDiscoverer}.
     *
     * @return A {@link PatternProcessorDiscoverer} instance.
     */
    PatternProcessorDiscoverer<IN, OUT> createPatternProcessorDiscoverer() throws Exception;
}

...

Code Block
public class CEP {
    /**
     * Creates a {@link DataStream} containing the processed results of matched CEP pattern
     * processors.
     *
     * @param input DataStream containing the input events
     * @param factory Pattern processor discoverer factory to create the {@link
     *     PatternProcessorDiscoverer}
     * @param <IN> Type of the elements appearing in the pattern
     * @param <OUT> Type of produced elements based on found matches
     * @return Resulting data stream
     */
    public static <IN, OUT> DataStream<OUT> patternProcesspatternProcessors(
            DataStream<IN> input, PatternProcessorDiscovererFactory<IN, OUT> factory) {...}
}

...

Code Block
DataStream<Event> inputStream = env.fromElements(...);

DataStream<Object> outputStream = CEP.patternProcesspatternProcessors(inputStream, factory);

After Flink job has been configured to support dynamic pattern processor and successfully submitted, users can update pattern processors by configuring pattern processors in their database, recording them in a table as follows.

uid

className

constructorParams

1

"org.apache.flink.cep.processor.DummyPatternProcessor"

[{"type": "int", "value": 0}]

2

"org.apache.flink.cep.processor.DummyPatternProcessor"

[{"type": "int", "value": 1}]

Information like pattern processor update or exception will be recorded in Flink's log system. Users can check the log to see, for example, whether a pattern processor update succeeded or not.

...

Add Key Generating and Pattern Matching & Processing Operator

In order to achive the functions described in pubic API, we propose to add two stream operators in the stream graph.

The first is a key generating operator. For each input data and each active PatternProcessor, the operator would generate a key according to the KeySelector in the PatternProcessor and attach the key to that data.

After that, the stream graph would do a keyBy() operation on the input data, using the generated key. In this way the downstream operator could have data with the same key aggregated on the same subtask, and that key can be dyncamically updated.

Then the second operator serves to match data to patterns and to process the matched results with the PatternProcessFunctions. The first duty is almost the same as that of CepOperator, except that it needs to handle multiple and dynamic patterns now. The second duty has been achieved by PatternStream.process() method, but now it needs to be achieved by inner implementations.

The general structure that supports dynamic pattern processors in a job graph is shown in the wireframe below.


Add OperatorCoordinator for the two operators

We propose to add OperatorCoordinators for the two operators, which is responsible for listening for updates and keeping the consistency among subtasks. The OperatorCoordinator has been proposed in FLIP-27 and most mechanisms we need would have been achieved in its implementation, so we should only need to inherit OperatorCoordinator and achieve the unique implementation details of these two operators.

In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the pattern processors and thus need a PatternProcessorDiscoverer. Our design of collecting pattern processors and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be pattern processor discoverers inside each of their Operator Coordinators. The discoverer in the OperatorCoordinator serves to discover pattern processor changes and convert the information into pattern processors. Then the subtasks, after receiving the latest pattern processors from Operator Coordinator, will make the update.

Possible implementations of pattern processor discovery and serialization format

The public interfaces we propose above aims to yield the flexibility of discovering pattern processors and serializing them to users. It is also possible that we provide some built-in implementations for common uses. Some possible ways for the discoverer to discover pattern processors are as follows:

  • The discoverer periodically checks a file in filesystem or a table in database for updates

  • The discoverer listens on external commands, like HTTP requests, for updates

And the format to transmit serialized pattern processors to Flink jobs before it is converted to Java objects can be follows:

  • Directly serializing Java objects using Java serialization or Kryo

  • Using readable formats like JSON or XML

  • Using the standard MATCH_RECOGNIZE grammar in SQL proposed by ISO

Supporting multiple & dynamic pattern processors inside CEP operators

...

Code Block
SELECT T.aid, T.bid, T.cid
FROM MyTable
    MATCH_RECOGNIZE_DYNAMIC (
      pattern_processor_discoverer_factory = custom_pattern_processor_discoverer_factory
    ) AS T

While MATCH_RECOGNIZE is standard SQL released by the International Organization for Standardization (ISO), we have not found any standard expression format that supports multiple & dynamic match in SQL. Thus the example above is just a simple example showing how the API might look like. Further refinement of the API is required when it comes to detailed design.

As for implementation, Flink's infrastructure for Table/SQL can recognize the keyword above and create corresponding StreamOperators. The operators can accept the PatternProcessorDiscovererFactory and pass it to the existing implementations proposed in this FLIP, which means Table/SQL and DataStream implementation of this functionality can share a lot in common.

Rejected Alternatives

  • Have each subtask of an operator make the update on their own

    • It is hard to achieve consistency.

      • Though the time interval that each subtask makes the update can be the same, the absolute time they make the update might be different. For example, one makes updates at 10:00, 10:05, etc, while another does it at 10:01, 10:06. In this case the subtasks might never processing data with the same set of pattern processors.

      • In cases when users want to control the updates with external commands, it might require extra work for them to make sure that the command has reached all the subtasks.

  • Using a DataStream<PatternProcessor<IN, OUT>>, instead of OperatorCoordinators, to pass updates

    • Unlike input data that is flowing in as a stream, updates are actually control messages that, instead of passing in as datastream, should be sent out by a control center like JobManager. OperatorCoordinators meet this semantics and thus is a more proper choice.