Status
This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
...
Page properties | |
---|---|
|
...
...
|
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In situations like Risk Controlling or Pattern Matching, it is common that users want to change the pattern that events need to match to while the pattern is still functioning to provide service. In current Flink CEP, a CEP operator has a fixed CEP pattern and does not support changing it. Thus in order to achieve the goal described above, users have to restart flink jobs and wait relatively long update time.
...
In this FLIP, we propose to add a couple of APIs and classes to Flink CEP in order to support having multiple patterns in one operator and updating patterns dynamically without stoppling stopping Flink jobs.
Public Interfaces
We propose to make the following API changes to support dynamic pattern changing in CEP.
Add
...
PatternProcessor Interface
The term "rulePatternProcessor" defines a certain pattern, how to match the pattern, and how to process the found matches. The properties of the rule are required by CEP operator in order to function correctly.A pattern processor extends beyond a pattern to also include all the other relevant functions. As shown in the wireframe below, Process 1 and Process 2 are equivalent process.
The starting point of introducing "PatternProcessor" is trying to deal with situations like follows:
While one pattern requires input data to be grouped by user_id , another pattern might require data to be grouped by item_id .
Users might want to distinguish results that match different patterns and deal with them differently.
Simply having multiple patterns might not be able to deal with situations like above, and the concept PatternProcessor can handle them properly.
We propose to add PatternProcessor interface as below. The properties of the pattern processor are required by CEP operator in order to function correctly.
Code Block |
---|
Code Block |
/** * Base class for a rulepattern processor definition. * * <p>A rulepattern processor defines a {@link Pattern}, how to match the pattern, and how to process * the found * matches. * * @param <IN> Base type of the elements appearing in the pattern. * @param <OUT> Type of produced elements based on found matches. */ @PublicEvolving public interface Rule<INPatternProcessor<IN, OUT> extends Serializable, Versioned { /** * Returns the ID of the rulepattern processor. * * @return The ID of the pattern ruleprocessor. */ String getId(); /** * Returns the {@link Pattern} to be matched. *scheduled time at which the pattern processor should come into effective. * @return The pattern of the* rule. <p>If the scheduled time is */ earlier than current event/processing Pattern<INtime, the ?> getPattern(); pattern processor /** will immediately become effective. * Returns the {@link* KeySelector} to extract the key* of<p>If the elementspattern appearingprocessor inshould thealways pattern. become effective immediately, the scheduled *time * <p>Only data with the same key can be chainedset to match a pattern. If extracted key is null,{@code Long.MIN_VALUE}: {@value Long#MIN_VALUE}. * * the@return behaviorThe is to reuse current key if is {@link KeyedStream}, or allocate the same key for scheduled time. */ default Long getTimestamp() { * all input data if is notreturn Long.MIN_VALUE; } /** * Returns the {@link KeyedStream}Pattern} to be matched. * * @return The keypattern selector of the rulepattern processor. */ @Nullable KeySelector<INPattern<IN, ?> getKeySelectorgetPattern(); /** * GetReturns the {@link PatternProcessFunctionKeySelector} to process extract the key of the foundelements matchesappearing forin the pattern. * * @return The pattern process function of the rule.<p>Only data with the same key can be chained to match a pattern. If extracted key is null, */ PatternProcessFunction<IN, OUT> getPatternProcessFunction(); } |
Given the needs of data exchange and state storage, the Rule
must be serializable, which requires that the classes involved in the Rule
must also be serializable, for example, make Pattern
implement Serializable
.
Add RuleManager Interface
We propose to introduce the RuleManager
interface. The manager handles the rule updates and provides information about current active rules.
Its possible implementation includes components in Flink job that need to receive rule update notifications, like CepOperator
or its OperatorCoordinator. This means that this interface will only have internal implementations. Users only need to use this interface, not to inherit or create new instance of child classes of this interface.
Code Block |
---|
/**
* This manager handles updated rules and manages current rules.
*
* @param <IN> Base type of the elements appearing in the pattern.
* @param <OUT> Type of produced elements based on found matches.
*/
@PublicEvolving
public interface RuleManager<IN, OUT> {
/**
* Deal with the notification that rules are updated.
*
* @param rules A list of all latest rules.
*/
void onRulesUpdated(List<Rule<IN, OUT>> rules);
/**
* Returns the current rules managed.
*
* @return The current rules.
*/
List<Rule<IN, OUT>> getCurrentRules();
} |
Add RuleDiscoverer Interface
We introduce the RuleDiscoverer
interface, which discovers the rule changes and notifies the RuleManager
of the collected rule updates. This interface also serves to provide initial rules.
the behavior is to reuse current key if is {@link KeyedStream}, or allocate the same key for
* all input data if is not {@link KeyedStream}.
*
* @return The key selector of the pattern processor.
*/
@Nullable
KeySelector<IN, ?> getKeySelector();
/**
* Get the {@link PatternProcessFunction} to process the found matches for the pattern.
*
* @return The pattern process function of the pattern processor.
*/
PatternProcessFunction<IN, OUT> getPatternProcessFunction();
} |
Given the needs of data exchange and state storage, the PatternProcessor must be serializable, which requires that the classes involved in the PatternProcessor must also be serializable, for example, make Pattern implement Serializable.
Add PatternProcessorManager Interface
We propose to introduce the PatternProcessorManager
interface. The manager handles the pattern processor updates and provides information about current active pattern processors.
Its possible implementation includes components in Flink job that need to receive pattern processor update notifications, like CepOperator
or its OperatorCoordinator. This means that this interface will only have internal implementations. Users only need to use this interface, not to inherit or create new instance of child classes of this interface.
Code Block |
---|
/**
* This manager handles updated pattern processors and manages current pattern processors.
*
* @param <IN> Base type of the elements appearing in |
Code Block |
/** * Interface that discovers rule changes, notifies {@link RuleManager} of rule updates and provides * the initial rules. * * @param <IN> Base type of the elements appearing in the pattern. * @param <OUT> Type of produced elements based on found matches. */ @PublicEvolving public interface RuleDiscoverer<INPatternProcessorManager<IN, OUT> { /** * Deal Discoverwith the rule changes notification that pattern processors are updated. * * <p>In@param dynamicpatternProcessors ruleA changinglist case,of thisall functionlatest should be a continuous process to check rule * updates and notify rule managerpattern processors. */ void discoverRuleChanges(onPatternProcessorsUpdated(List<PatternProcessor<IN, OUT>> patternProcessors); /** * Returns the rulescurrent anpattern operator should be set up withprocessors managed. * * @return The current initialpattern rulesprocessors. */ List<Rule<INList<PatternProcessor<IN, OUT>> getInitialRulesgetCurrentPatternProcessors(); } |
Add PatternProcessorDiscoverer Interface
We introduce the PatternProcessorDiscoverer
interface, which discovers the pattern processor changes and notifies the PatternProcessorManager
of the collected pattern processor updates. This interface also serves to provide initial pattern processorsUsers should be responsible for creating child classes for this interface, as the possible ways to collect rules are unlimited. Still, we can provide several abstract class that can handle most use cases, like the following AbstractPeriodicRuleDiscoverer
that periodically checks for updates.
Code Block |
---|
/** * ClassInterface that periodicallydiscovers checkspattern for ruleprocessor updates. */ public abstract class AbstractPeriodicRuleDiscoverer<IN, OUT> implements RuleDiscoverer<IN, OUT> { private final List<Rule<IN, OUT>> initRules; private final long intervalMillis; private final RuleManager<IN, OUT> ruleManager; /** * Constructor. * * @param intervalMillis Time interval in milliseconds at which to check, notifies {@link PatternProcessorManager} of * pattern processor updates and provides the initial pattern processors. * * @param <IN> Base type of the elements appearing in the pattern. * @param <OUT> Type of produced elements based on found matches. */ @PublicEvolving public interface PatternProcessorDiscoverer<IN, OUT> { /** * Discover the pattern processor updates. */ public AbstractPeriodicRuleDiscoverer( * <p>In dynamic pattern processor update case, this function should be a List<Rule<IN,continuous OUT>>process initRules,to long intervalMillis, RuleManager<IN, OUT> ruleManager)* { check pattern processor updates and notify the {@link this.initRules = initRules;PatternProcessorManager}. */ this.intervalMillis = intervalMillis; void discoverPatternProcessorUpdates(PatternProcessorManager<IN, OUT> manager) throws Exception; this.ruleManager = ruleManager; /** * } Returns the pattern processors /**an Checksoperator whethershould therebe isset ruleup updatewith. */ public abstract* boolean checkRuleUpdate(); /** Returns the latest rules.@return The initial pattern processors. */ publicList<PatternProcessor<IN, abstract List<Rule<IN, OUT>> getLatestRulesOUT>> getInitialPatternProcessors(); } |
Users should be responsible for creating child classes for this interface, as the possible ways to collect pattern processors are unlimited. Still, we can provide several abstract class that can handle most use cases, like the following AbstractPeriodicPatternProcessorDiscoverer
that periodically checks for updates.
Code Block |
---|
/** Class that periodically checks for pattern processor updates. */ public abstract class AbstractPeriodicPatternProcessorDiscoverer<IN, OUT> @Override public void discoverRuleChanges() { implements while (true)PatternProcessorDiscoverer<IN, OUT> { private final List<PatternProcessor<IN, OUT>> initPatternProcessors; private if (checkRuleUpdate()) {final long intervalMillis; /** * Constructor. * List<Rule<IN, OUT>> rules = getLatestRules(); * @param intervalMillis Time interval in milliseconds at which to check updates. ruleManager.onRulesUpdated(rules);*/ public AbstractPeriodicPatternProcessorDiscoverer( } List<PatternProcessor<IN, OUT>> initPatternProcessors, long intervalMillis) { try { this.initPatternProcessors = initPatternProcessors; this.intervalMillis = Thread.sleep(intervalMillis); } /** Checks whether there }is catchpattern (InterruptedException e) {processor update. */ public abstract boolean checkPatternProcessorUpdate(); /** Returns the latest pattern e.printStackTrace();processors. */ public abstract List<PatternProcessor<IN, OUT>> getLatestPatternProcessors(); @Override public void break;discoverPatternProcessorUpdates( PatternProcessorManager<IN, } OUT> patternProcessorManager) { } while (true) }{ @Override public List<Rule<IN, OUT>>if getInitialRules(checkPatternProcessorUpdate()) { return initRules; } } |
Add RuleDiscovererFactory Interface
The RuleDiscovererFactory
is introduced to create the RuleDiscoverer
with the RuleManager
, that requires RuleDiscoverer
to notify RuleManager
to deal with the rule changes.
Code Block |
---|
/** * A factoryList<PatternProcessor<IN, forOUT>> {@link RuleDiscoverer}. The created {@link RuleDiscoverer} should notify {@link * RuleManager} to deal with the rule changes. * * @param <IN> Base type of the elements appearing in the pattern. * @param <OUT> Type of produced elements based on found matches. */ public interface RuleDiscovererFactory<IN, OUT> extends Serializable { /**patternProcessors = getLatestPatternProcessors(); patternProcessorManager.onPatternProcessorsUpdated(patternProcessors); } try { Thread.sleep(intervalMillis); * Creates a {@link RuleDiscoverer}.} catch (InterruptedException e) { * * @param ruleManager The rule manager notifiede.printStackTrace(); * @return A {@link RuleDiscoverer} instance. */break; RuleDiscoverer<IN, OUT> createRuleDiscoverer(RuleManager<IN, OUT> ruleManager); } |
Add CEP.rule() method
CEP
adds the rule()
method to apply the dynamic rules onto the input DataStream
and create new DataStream
containing the processed results of matched CEP rules.
Code Block |
---|
public class CEP {} /** } * Creates a {@link DataStream} containing the processed results@Override of matched CEP rules. public List<PatternProcessor<IN, OUT>> getInitialPatternProcessors() *{ * @param input DataStream containing the input eventsreturn initPatternProcessors; * @param factory Rule discoverer factory to create the {@link RuleDiscoverer} * @param <IN> type of the elements appearing in the pattern } } |
Add PatternProcessorDiscovererFactory Interface
The PatternProcessorDiscovererFactory is introduced to create the PatternProcessorDiscoverer with the PatternProcessorManager, that requires PatternProcessorDiscoverer to notify PatternProcessorManager to deal with the pattern processor changes.
Code Block |
---|
/** * A factory for {@link PatternProcessorDiscoverer}. The created {@link PatternProcessorDiscoverer} * should notify {@link PatternProcessorManager} to deal with the pattern processor updates. * * @param <OUT><IN> Base type of the elements appearing in the pattern. * @param <OUT> Type of produced elements based on found matches. */ public interface PatternProcessorDiscovererFactory<IN, OUT> *extends @return Resulting data streamSerializable { /**/ public static* <IN,Creates OUT>a DataStream<OUT> rule({@link PatternProcessorDiscoverer}. * * @return DataStream<IN> input, RuleDiscovererFactory<INA {@link PatternProcessorDiscoverer} instance. */ PatternProcessorDiscoverer<IN, OUT> factorycreatePatternProcessorDiscoverer() {...} } |
Example Usage
This section provides code snippets that shows how users can use the APIs proposed above to change CEP rules dynamically.
Suppose the CEP library provided an implementation of RuleDiscoverer
, called PeriodDBRuleDiscoverer
, that periodically checks a database for rule updates and initializes rules from a fixed rule set according to the json.
Code Block |
---|
/**
* Implementation of {@link RuleDiscoverer} that periodically checks a database for rule updates and
* initializes the {@link Rule} from a fixed rule set according to information retrieved from the database.
*/
public class PeriodDBRuleDiscoverer<IN, OUT> implements RuleDiscoverer<IN, OUT> {
@Override
public void discoverRuleChanges() {...}
@Override
public List<Rule<IN, OUT>> getInitialRules() {...}
} |
To start with, users need to provide classes that implement Rule
interface. Suppose some class called DummyRule
is created like below.
throws Exception;
} |
Add CEP.patternProcess() method
CEP
adds the patternProcess()
method to apply the dynamic pattern processors onto the input DataStream
and create new DataStream
containing the processed results of matched CEP pattern processors.
Code Block |
---|
public class CEP {
/**
* Creates a {@link DataStream} containing the processed results of matched CEP pattern
* processors.
*
* @param input DataStream containing the input events
* @param factory Pattern processor discoverer factory to create the {@link
* PatternProcessorDiscoverer}
* @param <IN> Type of the elements appearing in the pattern
* @param <OUT> Type of produced elements based on found matches
* @return Resulting data stream
*/ |
Code Block |
public class DummyRule implements Rule<Event, Object> { public DummyRule(int a) {...} @Override public String getId() {...} @Override public int getVersion() {...} @Overridestatic <IN, OUT> DataStream<OUT> patternProcessors( public Pattern<Event, ?> getPattern() {...} @Nullable DataStream<IN> @Override public KeySelector<Event, ?> getKeySelector(input, PatternProcessorDiscovererFactory<IN, OUT> factory) {...} @Override public PatternProcessFunction<Event, Object> getPatternProcessFunction() {...} } |
It is recommended that users override the implementations' toString()
, equals()
, and hashCode()
methods. Overriding toString()
provides better readability when monitoring the rules through logs, and overriding equals()
and hashCode()
can help to avoid reloading existing rules during a rule update.
In order to use PeriodDBRuleDiscoverer
, users need to create an instance of PeriodDBRuleDiscovererFactory
.
Code Block |
---|
PeriodDBRuleDiscovererFactory<Event, Object> factory = new PeriodDBRuleDiscovererFactory(); |
Then users create DataStream
with input data stream and this factory.
Code Block |
---|
DataStream<Event> inputStream = env.fromElements(...);
DataStream<Object> outputStream = CEP.rule(inputStream, factory); |
After Flink job has been configured to support dynamic rule and successfully submitted, users can update rules by configuring rules in their database, recording them in a table as follows.
...
uid
...
className
...
constructorParams
...
1
...
"org.apache.flink.cep.rule.DummyRule"
...
[{"type": "int", "value": 0}]
...
2
...
"org.apache.flink.cep.rule.DummyRule"
...
[{"type": "int", "value": 1}]
Information like rule update or exception will be recorded in Flink's log system. Users can check the log to see, for example, whether a rule update succeeded or not.
The general process of the example described in this section and some general implementation idea is shown in the wireframe below. Operator Coordinator on JobManager will read rule updates from users' database and send rule updates to subtasks, and as a result the matched & processed results produced by subtasks are influenced.
Proposed Changes
The general structure that supports dynamic rules in a job graph is shown in the wireframe below. When an input event data comes from upstream operator, it goes through the following process to produce a matched & processed record.
- Key Generating Operator computes the key of the input data according to the
KeySelector
provided byRule
s, and attach the key along with the input data. If there areN
rules, then there will beN
records with the same input data and different keys sent downstream. - Input data is sent to different downstream operator subtasks by calling
keyBy()
operation on them.keyBy()
uses the key generated in the previous step. - The next operator matches the input data to
Pattern
s provided byRule
s, and when a matched sequence is generated, the operator calls the correspondingPatternProcessFunction
on that sequence and produce the result to downstream.
In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the rules and thus need a RuleDiscoverer
. Our design of collecting rules and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be rule managers inside each of their Operator Coordinators. The manager in the OperatorCoordinator serves to discover rule changes and convert the information into rules. Then the subtasks, after receiving the latest rules from Operator Coordinator, will make the update.
Supporting multiple & dynamic rules inside CEP operators
Currently CepOperator uses NFAState
and NFA
to deal with input data. As it has not supported multiple patterns yet, all input data are dealt with by the same NFAState
.
In order to support multiple patterns in one operator, the one NFA
needs to be replaced by a list or map of NFA
, each corresponds to a Rule. When input data triggers the processElement()
method, the data will be matched against all rules in the operator. Adding or deleting rules means adding or deleting entries in the list or map.
Failover
During checkpoints, the operators mentioned above will store serialized rules and partially matched results inside state backend and restore them during recovery. In this way rules or matches will not be lost during failover.
Common case behavior analysis
When a rule is deleted or replaced, partially matched results related to the rule is directly deprecated and will no longer match to incoming input data.
If there is no rule in a CepOperator, all input data will be deprecated as they cannot match to any of the existing patterns.
If IDs of the List<Rule>
provided by RuleDiscoverer
duplicate with existing rules, versions of the Rule
s will be checked. existing rules will be preserved if the versions of the two are the same, or it will be replaced by the newer one otherwise.
Given that the proposed design offers eventual consistency, there could be a period of time just after a rule update happens when inconsistency exists among subtasks of CepOperators, which means they do not work on the same set of rules. This inconsistency is caused by the fact that OperatorCoordinator cannot send OperatorEvent to subtasks simutaneously, and this period of time usually lasts for milliseconds. As can be illustrated in the graph below, fake alerts ("AC" instead of "ABC") might be generated during that period.
Compatibility, Deprecation, and Migration Plan
The changes proposed in this FLIP is backward compatible with existing APIs of Flink CEP. We propose to change the APIs directly without deprecation period.
Current CEP.pattern()
methods still exist after changes of this FLIP is introduced, and static CEP patterns created by these methods can still preserve their function, performance and job structure.
Test Plan
We will use unit tests to cover the correctness of introduced features.
Future Work
Service Degradation
As all CEP patterns can be placed in the same operator of the same Flink job in the features we proposed, the error of a single pattern could cause the failure of the whole Flink job, thus affecting the availability of other CEP patterns.
In order to solve this problem, we plan to make CepOperators able to handle exceptions within a rule's scope. When the matching or processing function of a certain rule throws exceptions, the operator should catch this exception, convert it into error messages and sent to side output to call for external solution. The rule that threw the exception will be temporarily disabled on that subtask while other rules work unaffected.
We plan to expose this feature as a configurable property. The default behavior is still causing the whole Flink job to fail.
Table/SQL API Support
Currently we only propose the design for multiple/dynamic rules in DataStream API, but our design can also be extended into Table/SQL API. A general idea is that UDFs can be defined for CEP.rule()
and RuleDiscovererFactory
implementations, and users will be able to use them in SQL like this:
...
} |
Example Usage
This section provides code snippets that shows how users can use the APIs proposed above to change CEP pattern processors dynamically.
Suppose the CEP library provided an implementation of PatternProcessorDiscoverer
, called PeriodDBPatternProcessorDiscoverer
, that periodically checks a database for pattern processor updates and initializes pattern processors from a fixed pattern processor set according to the json.
Code Block |
---|
/**
* Implementation of {@link PatternProcessorDiscoverer} that periodically checks a database for pattern processor updates.
*/
public class PeriodDBPatternProcessorDiscoverer<IN, OUT> implements PatternProcessorDiscoverer<IN, OUT> {
@Override
public void discoverPatternProcessorChanges() {...}
@Override
public List<PatternProcessor<IN, OUT>> getInitialPatternProcessors() {...}
} |
To start with, users need to provide classes that implement PatternProcessor
interface. Suppose some class called DummyPatternProcessor
is created like below.
Code Block |
---|
public class DummyPatternProcessor implements PatternProcessor<Event, Object> {
public DummyPatternProcessor(int a) {...}
@Override
public String getId() {...}
@Override
public int getVersion() {...}
@Override
public Pattern<Event, ?> getPattern() {...}
@Nullable
@Override
public KeySelector<Event, ?> getKeySelector() {...}
@Override
public PatternProcessFunction<Event, Object> getPatternProcessFunction() {...}
} |
In order to use PeriodDBPatternProcessorDiscoverer
, users need to create an instance of PeriodDBPatternProcessorDiscovererFactory
.
Code Block |
---|
PeriodDBPatternProcessorDiscovererFactory<Event, Object> factory = new PeriodDBPatternProcessorDiscovererFactory(); |
Then users create DataStream
with input data stream and this factory.
Code Block |
---|
DataStream<Event> inputStream = env.fromElements(...);
DataStream<Object> outputStream = CEP.patternProcessors(inputStream, factory); |
After Flink job has been configured to support dynamic pattern processor and successfully submitted, users can update pattern processors by configuring pattern processors in their database, recording them in a table as follows.
uid | className | constructorParams |
1 | "org.apache.flink.cep.processor.DummyPatternProcessor" | [{"type": "int", "value": 0}] |
2 | "org.apache.flink.cep.processor.DummyPatternProcessor" | [{"type": "int", "value": 1}] |
Information like pattern processor update or exception will be recorded in Flink's log system. Users can check the log to see, for example, whether a pattern processor update succeeded or not.
The general process of the example described in this section and some general implementation idea is shown in the wireframe below. Operator Coordinator on JobManager will read pattern processor updates from users' database and send pattern processor updates to subtasks, and as a result the matched & processed results produced by subtasks are influenced.
Proposed Changes
In order to achive the functions described in pubic API, we propose to add two stream operators in the stream graph.
The first is a key generating operator. For each input data and each active PatternProcessor, the operator would generate a key according to the KeySelector in the PatternProcessor and attach the key to that data.
After that, the stream graph would do a keyBy() operation on the input data, using the generated key. In this way the downstream operator could have data with the same key aggregated on the same subtask, and that key can be dyncamically updated.
Then the second operator serves to match data to patterns and to process the matched results with the PatternProcessFunctions. The first duty is almost the same as that of CepOperator, except that it needs to handle multiple and dynamic patterns now. The second duty has been achieved by PatternStream.process() method, but now it needs to be achieved by inner implementations.
The general structure that supports dynamic pattern processors in a job graph is shown in the wireframe below.
We propose to add OperatorCoordinators for the two operators, which is responsible for listening for updates and keeping the consistency among subtasks. The OperatorCoordinator has been proposed in FLIP-27 and most mechanisms we need would have been achieved in its implementation, so we should only need to inherit OperatorCoordinator and achieve the unique implementation details of these two operators.
In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the pattern processors and thus need a PatternProcessorDiscoverer. Our design of collecting pattern processors and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be pattern processor discoverers inside each of their Operator Coordinators. The discoverer in the OperatorCoordinator serves to discover pattern processor changes and convert the information into pattern processors. Then the subtasks, after receiving the latest pattern processors from Operator Coordinator, will make the update.
The public interfaces we propose above aims to yield the flexibility of discovering pattern processors and serializing them to users. It is also possible that we provide some built-in implementations for common uses. Some possible ways for the discoverer to discover pattern processors are as follows:
The discoverer periodically checks a file in filesystem or a table in database for updates
The discoverer listens on external commands, like HTTP requests, for updates
And the format to transmit serialized pattern processors to Flink jobs before it is converted to Java objects can be follows:
Directly serializing Java objects using Java serialization or Kryo
Using readable formats like JSON or XML
Using the standard MATCH_RECOGNIZE grammar in SQL proposed by ISO
Supporting multiple & dynamic pattern processors inside CEP operators
Currently CepOperator uses NFAState
and NFA
to deal with input data. As it has not supported multiple patterns yet, all input data are dealt with by the same NFAState
.
In order to support multiple patterns in one operator, the one NFA
needs to be replaced by a list or map of NFA
, each corresponds to a PatternProcessor. When input data triggers the processElement()
method, the data will be matched against all pattern processors in the operator. Adding or deleting pattern processors means adding or deleting entries in the list or map.
Scheduling pattern processors
In order to make the behavior of pattern processor updates more deterministic, users can schedule the time at which an updated pattern processor should come into effect, which is reflected by the getTimestamp()
method in PatternProcessor
.
When a new set of pattern processors is provided by PatternProcessorDiscoverer
, they will first be cached in operators. When a watermark arrived at the operator, it will check the timestamp of the watermark against the scheduled time of cached pattern processors and update pattern processors accordingly.
Failover
During checkpoints, the operators mentioned above will store serialized pattern processors and partially matched results inside state backend and restore them during recovery. In this way pattern processors or matches will not be lost during failover.
Common case behavior analysis
When a pattern processor is deleted or replaced, partially matched results related to the pattern processor is directly deprecated and will no longer match to incoming input data.
If there is no pattern processor in a CepOperator, all input data will be deprecated as they cannot match to any of the existing patterns.
If IDs of the List<PatternProcessor>
provided by PatternProcessorDiscoverer
duplicate with existing pattern processors, versions of the PatternProcessor
s will be checked. existing pattern processors will be preserved if the versions of the two are the same, or it will be replaced by the newer one otherwise.
Given that the proposed design offers eventual consistency, there could be a period of time just after a pattern processor update happens when inconsistency exists among subtasks of CepOperators, which means they do not work on the same set of pattern processors. This inconsistency is caused by the fact that OperatorCoordinator cannot send OperatorEvent to subtasks simultaneously, and this period of time usually lasts for milliseconds. As can be illustrated in the graph below, fake alerts ("AC" instead of "ABC") might be generated during that period.
Compatibility, Deprecation, and Migration Plan
The changes proposed in this FLIP is backward compatible with existing APIs of Flink CEP. We propose to change the APIs directly without deprecation period.
Current CEP.pattern()
methods still exist after changes of this FLIP is introduced, and static CEP patterns created by these methods can still preserve their function, performance and job structure.
Test Plan
We will use unit tests to cover the correctness of introduced features.
Future Work
Service Degradation
As all CEP patterns can be placed in the same operator of the same Flink job in the features we proposed, the error of a single pattern could cause the failure of the whole Flink job, thus affecting the availability of other CEP patterns.
In order to solve this problem, we plan to make CepOperators able to handle exceptions within a PatternProcessor's scope. When the matching or processing function of a certain pattern processor throws exceptions, the operator should catch this exception, convert it into error messages and sent to side output to call for external solution. The pattern processor that threw the exception will be temporarily disabled on that subtask while other pattern processors work unaffected.
We plan to expose this feature as a configurable property. The default behavior is still causing the whole Flink job to fail.
Table/SQL API Support
Currently we only propose the design for multiple/dynamic pattern processors in DataStream API, but our design can also be extended into Table/SQL API.
In order to achieve the support, we need to add a keyword in SQL like follows, similar to the existing MATCH_RECOGNIZE
keyword.
Code Block |
---|
SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE_DYNAMIC (
pattern_processor_discoverer_factory = custom_pattern_processor_discoverer_factory
) AS T |
While MATCH_RECOGNIZE is standard SQL released by the International Organization for Standardization (ISO), we have not found any standard expression format that supports multiple & dynamic match in SQL. Thus the example above is just a simple example showing how the API might look like. Further refinement of the API is required when it comes to detailed design.
As for implementation, Flink's infrastructure for Table/SQL can recognize the keyword above and create corresponding StreamOperators. The operators can accept the PatternProcessorDiscovererFactory and pass it to the existing implementations proposed in this FLIP, which means Table/SQL and DataStream implementation of this functionality can share a lot in common.
Have each subtask of an operator make the update on their own
It is hard to achieve consistency.
Though the time interval that each subtask makes the update can be the same, the absolute time they make the update might be different. For example, one makes updates at 10:00, 10:05, etc, while another does it at 10:01, 10:06. In this case the subtasks might never processing data with the same set of pattern processors.
In cases when users want to control the updates with external commands, it might require extra work for them to make sure that the command has reached all the subtasks.
Using a DataStream<PatternProcessor<IN, OUT>>, instead of OperatorCoordinators, to pass updates
Unlike input data that is flowing in as a stream, updates are actually control messages that, instead of passing in as datastream, should be sent out by a control center like JobManager. OperatorCoordinators meet this semantics and thus is a more proper choice.