...
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Integrating-Flink-Table-API-amp-SQL-with-CEP-td17964.html
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Released: -
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
SELECT *
FROM Ticker MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY tstamp
MEASURES STRT.tstamp AS start_tstamp,
LAST(DOWN.tstamp) AS bottom_tstamp,
LAST(UP.tstamp) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST UP
PATTERN (STRT DOWN+ UP+)
DEFINE
DOWN AS DOWN.price < PREV(DOWN.price),
UP AS UP.price > PREV(UP.price)
) MR
ORDER BY MR.symbol, MR.start_tstamp;
// input
SYMBOL TSTAMP PRICE
------- -------- ------
'ACME' '01-Apr-11' 12
'ACME' '02-Apr-11' 17
'ACME' '03-Apr-11' 19
'ACME' '04-Apr-11' 21
'ACME' '05-Apr-11' 25
'ACME' '06-Apr-11' 12
'ACME' '07-Apr-11' 15
'ACME' '08-Apr-11' 20
'ACME' '09-Apr-11' 24
'ACME' '10-Apr-11' 25
'ACME' '11-Apr-11' 19
// output
SYMBOL START_TST BOTTOM_TS END_TSTAM
---------- --------- --------- ---------
ACME 05-APR-11 06-APR-11 10-APR-11
...
Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks:
- Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses.
- Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define.
- Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause.
...
- Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause.
...
- Syntax: Defines the exported columns.
- Flink-CEP: PatternStream.select(IterativeCondition)
- Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13
{ONE ROW | ALL ROWS} PER MATCH
...
- Syntax: Used to specify a regular expression, the regular expression is built from variable names defined the define clause.
- Flink-CEP: Partially supported in Pattern, more functionalities depend on feature#(1,2,3) In the “Required features” section
- Status-in-Calcite: Supported in 1.12.0
...
SUBSET -- optional
- Syntax: This syntax defines variables which are correlation variables defined in the pattern clause. This named variable can be used in the measures clause and define clause.
- Flink-CEP: Easy to implement
- Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13
...
DEFINE -- mandatory
- Syntax: Used to define the boolean condition that defines a variable name that is declared in the pattern.
- Flink-CEP: Can be supported with IterativeCondition
- Status-in-Calcite: Supported in 1.12.0
...
CLASSIFIER -- optional
- Syntax: This syntax is only available with ALL ROWS PER MATCH. It’s used to specify the variable name in the pattern that the row matched.
- Flink-CEP: Easy to implement
- Status-in-Calcite: Supported in 1.12.0
MATCH_NUMBER -- optional
- Syntax: This syntax allows the user to receive the sequential number of the current match.
- Flink-CEP: Easy to implement
- Status-in-Calcite: Supported in 1.12.0
...
WITHIN -- optional
- Syntax: Output a pattern match iff the match occurs within the specified time duration.
- Flink-CEP: Pattern.within(Time)
- Status-in-Calcite: Not supported yet
...
LogicalProject[select=”id, name”]
LogicalMatch[pattern=”A B+ C”, definitions=”...”]
LogicalTableScan[table=”T”]
...
What we need to do is translating “LogicalMatch” node to calls on Flink-CEP APIs. The most important work is translating PATTERN, DEFINE, and MEASURE.
There are six kinds of PATTERNs:
- PATTERN_CONCAT: such as pattern A B is a pattern means that event B has to directly succeed the previous matching event A. This can be expressed by Flink-CEP API pattern.next(String).
- PATTERN_QUANTIFIER: such as pattern B{3, 5} is a pattern that matches between 3 and 5 occurrences of B. And B+ is a pattern that matches one or more occurrences of B. B+ can be expressed by CEP API pattern.oneOrMore(), B{n, n} can be expressed by pattern.times(n), B? Can be expressed by pattern.optional(). B* can be expressed as pattern.oneOrMore().optional(). But B{n, m} and B* are not supported in CEP API.
- PATTERN_ALTER: such as pattern A | B is a pattern that matches A or B. This can’t be expressed by CEP API currently. But I find a issue related to this feature. See FLINK-4641
- PATTERN_GROUPING: such as ((A B){3} C) attempts to match the group (A B) three times and then seeks one occurrence of C. NOT supported in CEP API currently.
- PATTERN_PERMUTE: such as PERMUTE (A, B) is a pattern that matches all permutations of A and B. NOT supported in CEP API currently.
- PATTERN_EXCLUDE: such as {- A -} is a pattern that matches A but excludes A from the output.
And we can translate DEFINE into CEP’s IterativeCondition by code generation, and apply it into pattern.where(...). Most of the definition can be translated by current CodeGenerator, besides some specific calls such as PREV, LAST, FIRST.
...
- Support .times(from, to) on Pattern
- Support branching CEP patterns. FLINK-4641
- Support permutation on Pattern
- Support .prune(long pruningTimestamp) on NFA
- IterativeCondition support open() method to compile generated code and not de/serialized with NFA state.
- Support pattern grouping in Flink CEP
...
Compatibility, Deprecation, and Migration Plan
...