...
Simply having multiple patterns might not be able to deal with situations like above, and the concept PatternProcessor
can can handle them properly.
We propose to add PatternProcessor interface as below. The properties of the pattern processor are required by CEP operator in order to function correctly.
...
The general process of the example described in this section and some general implementation idea is shown in the wireframe below. Operator Coordinator on JobManager will read pattern processor updates from users' database and send pattern processor updates to subtasks, and as a result the matched & processed results produced by subtasks are influenced.
Proposed Changes
The general structure that supports dynamic pattern processors in a job graph is shown in the wireframe below. When an input event data comes from upstream operator, it goes through the following process to produce a matched & processed record.
- Key Generating Operator computes the key of the input data according to the
KeySelector
provided byPatternProcessor
s, and attach the key along with the input data. If there areN
pattern processors, then there will beN
records with the same input data and different keys sent downstream. - Input data is sent to different downstream operator subtasks by calling
keyBy()
operation on them.keyBy()
uses the key generated in the previous step. - The next operator matches the input data to
Pattern
s provided byPatternProcessor
s, and when a matched sequence is generated, the operator calls the correspondingPatternProcessFunction
on that sequence and produce the result to downstream.
In order to achive the functions described in pubic API, we propose to add two stream operators in the stream graph.
The first is a key generating operator. For each input data and each active PatternProcessor, the operator would generate a key according to the KeySelector in the PatternProcessor and attach the key to that data.
After that, the stream graph would do a keyBy() operation on the input data, using the generated key. In this way the downstream operator could have data with the same key aggregated on the same subtask, and that key can be dyncamically updated.
Then the second operator serves to match data to patterns and to process the matched results with the PatternProcessFunctions. The first duty is almost the same as that of CepOperator, except that it needs to handle multiple and dynamic patterns now. The second duty has been achieved by PatternStream.process()
method, but now it needs to be achieved by inner implementations.
The general structure that supports dynamic pattern processors in a job graph is shown in the wireframe below.
In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the pattern processors and thus need a PatternProcessorDiscoverer
. Our design of collecting pattern processors and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be pattern processor managers inside each of their Operator Coordinators. The manager in the OperatorCoordinator serves to discover pattern processor changes and convert the information into pattern processors. Then the subtasks, after receiving the latest pattern processors In the wireframe above, Key Generating Operator and Pattern Matching & Processing Operator uses information from the pattern processors and thus need a PatternProcessorDiscoverer
. Our design of collecting pattern processors and update them inside operators is shown in the wireframe below. In each of the two operators mentioned, there will be pattern processor managers inside each of their Operator Coordinators. The manager in the OperatorCoordinator serves to discover pattern processor changes and convert the information into pattern processors. Then the subtasks, after receiving the latest pattern processors from Operator Coordinator, will make the update.
Supporting multiple & dynamic pattern processors inside CEP operators
Currently CepOperator uses NFAState
and NFA
to deal with input data. As it has not supported multiple patterns yet, all input data are dealt with by the same NFAState
.
In order to support multiple patterns in one operator, the one NFA
needs to be replaced by a list or map of NFA
, each corresponds to a PatternProcessor. When input data triggers the processElement()
method, the data will be matched against all pattern processors in the operator. Adding or deleting pattern processors means adding or deleting entries in the list or map.
Scheduling pattern processors
In order to make the behavior of pattern processor updates more deterministic, users can schedule the time at which an updated pattern processor should come into effect, which is reflected by the getTimestamp()
method in PatternProcessor
.
When a new set of pattern processors is provided by PatternProcessorDiscoverer
, they will first be cached in operators. When a watermark arrived at the operator, it will check the timestamp of the watermark against the scheduled time of cached pattern processors and update pattern processors accordingly.
Failover
During checkpoints, the operators mentioned above will store serialized pattern processors and partially matched results inside state backend and restore them during recovery. In this way pattern processors or matches will not be lost during failover.
Common case behavior analysis
When a pattern processor is deleted or replaced, partially matched results related to the pattern processor is directly deprecated and will no longer match to incoming input data.
If there is no pattern processor in a CepOperator, all input data will be deprecated as they cannot match to any of the existing patterns.
If IDs of the List<PatternProcessor>
provided by PatternProcessorDiscoverer
duplicate with existing pattern processors, versions of the PatternProcessor
s will be checked. existing pattern processors will be preserved if the versions of the two are the same, or it will be replaced by the newer one otherwise.
Given that the proposed design offers eventual consistency, there could be a period of time just after a pattern processor update happens when inconsistency exists among subtasks of CepOperators, which means they do not work on the same set of pattern processors. This inconsistency is caused by the fact that OperatorCoordinator cannot send OperatorEvent to subtasks simultaneously, and this period of time usually lasts for milliseconds. As can be illustrated in the graph below, fake alerts ("AC" instead of "ABC") might be generated during that period.
Compatibility, Deprecation, and Migration Plan
The changes proposed in this FLIP is backward compatible with existing APIs of Flink CEP. We propose to change the APIs directly without deprecation period.
Current CEP.pattern()
methods still exist after changes of this FLIP is introduced, and static CEP patterns created by these methods can still preserve their function, performance and job structure.
Test Plan
We will use unit tests to cover the correctness of introduced features.
Future Work
Service Degradation
As all CEP patterns can be placed in the same operator of the same Flink job in the features we proposed, the error of a single pattern could cause the failure of the whole Flink job, thus affecting the availability of other CEP patterns.
The public interfaces we propose above aims to yield the flexibility of discovering pattern processors and serializing them to users. It is also possible that we provide some built-in implementations for common uses. Some possible ways for the discoverer to discover pattern processors are as follows:
The discoverer periodically checks a file in filesystem or a table in database for updates
The discoverer listens on external commands, like HTTP requests, for updates
And the format to transmit serialized pattern processors to Flink jobs before it is converted to Java objects can be follows:
Directly serializing Java objects using Java serialization or Kryo
Using readable formats like JSON or XML
Using the standard
MATCH_RECOGNIZE
grammar in SQL proposed by ISO
Supporting multiple & dynamic pattern processors inside CEP operators
Currently CepOperator uses NFAState
and NFA
to deal with input data. As it has not supported multiple patterns yet, all input data are dealt with by the same NFAState
.
In order to support multiple patterns in one operator, the one NFA
needs to be replaced by a list or map of NFA
, each corresponds to a PatternProcessor. When input data triggers the processElement()
method, the data will be matched against all pattern processors in the operator. Adding or deleting pattern processors means adding or deleting entries in the list or map.
Scheduling pattern processors
In order to make the behavior of pattern processor updates more deterministic, users can schedule the time at which an updated pattern processor should come into effect, which is reflected by the getTimestamp()
method in PatternProcessor
.
When a new set of pattern processors is provided by PatternProcessorDiscoverer
, they will first be cached in operators. When a watermark arrived at the operator, it will check the timestamp of the watermark against the scheduled time of cached pattern processors and update pattern processors accordingly.
Failover
During checkpoints, the operators mentioned above will store serialized pattern processors and partially matched results inside state backend and restore them during recovery. In this way pattern processors or matches will not be lost during failover.
Common case behavior analysis
When a pattern processor is deleted or replaced, partially matched results related to the pattern processor is directly deprecated and will no longer match to incoming input data.
If there is no pattern processor in a CepOperator, all input data will be deprecated as they cannot match to any of the existing patterns.
If IDs of the List<PatternProcessor>
provided by PatternProcessorDiscoverer
duplicate with existing pattern processors, versions of the PatternProcessor
s will be checked. existing pattern processors will be preserved if the versions of the two are the same, or it will be replaced by the newer one otherwise.
Given that the proposed design offers eventual consistency, there could be a period of time just after a pattern processor update happens when inconsistency exists among subtasks of CepOperators, which means they do not work on the same set of pattern processors. This inconsistency is caused by the fact that OperatorCoordinator cannot send OperatorEvent to subtasks simultaneously, and this period of time usually lasts for milliseconds. As can be illustrated in the graph below, fake alerts ("AC" instead of "ABC") might be generated during that period.
Compatibility, Deprecation, and Migration Plan
The changes proposed in this FLIP is backward compatible with existing APIs of Flink CEP. We propose to change the APIs directly without deprecation period.
Current CEP.pattern()
methods still exist after changes of this FLIP is introduced, and static CEP patterns created by these methods can still preserve their function, performance and job structure.
Test Plan
We will use unit tests to cover the correctness of introduced features.
Future Work
Service Degradation
As all CEP patterns can be placed in the same operator of the same Flink job in the features we proposed, the error of a single pattern could cause the failure of the whole Flink job, thus affecting the availability of other CEP patterns.
In order to solve this problem, we plan to In order to solve this problem, we plan to make CepOperators able to handle exceptions within a PatternProcessor's scope. When the matching or processing function of a certain pattern processor throws exceptions, the operator should catch this exception, convert it into error messages and sent to side output to call for external solution. The pattern processor that threw the exception will be temporarily disabled on that subtask while other pattern processors work unaffected.
...
While MATCH_RECOGNIZE
is standard SQL released by the International Organization for Standardization (ISO), we have not found any standard expression format that supports multiple & dynamic match in SQL. Thus the example above is just a simple example showing how the API might look like. Further refinement of the API is required when it comes to detailed design.As for implementation, Flink's infrastructure for Table/SQL can recognize the keyword above and create corresponding StreamOperators. The operators can accept the PatternProcessorDiscovererFactory
and pass it to the existing implementations proposed in this FLIP, which means Table/SQL and DataStream implementation of this functionality can share a lot in commonlike. Further refinement of the API is required when it comes to detailed design.
As for implementation, Flink's infrastructure for Table/SQL can recognize the keyword above and create corresponding StreamOperators. The operators can accept the PatternProcessorDiscovererFactory
and pass it to the existing implementations proposed in this FLIP, which means Table/SQL and DataStream implementation of this functionality can share a lot in common.
It is hard to achieve consistency.
Though the time interval that each subtask makes the update can be the same, the absolute time they make the update might be different. For example, one makes updates at 10:00, 10:05, etc, while another does it at 10:01, 10:06. In this case the subtasks might never processing data with the same set of pattern processors.
In cases when users want to control the updates with external commands, it might require extra work for them to make sure that the command has reached all the subtasks.
Using a
DataStream<PatternProcessor<IN, OUT>>
, instead of OperatorCoordinators, to pass updates