Status
Current state: "Accepted"
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Integrating-Flink-Table-API-amp-SQL-with-CEP-td17964.html
JIRA:
Released: 1.7
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink's CEP library is a great library for complex event processing, more and more customers are expressing their interests in it. But it also has some limitations that users usually have to write a lot of code even for a very simple pattern match use case as it currently only supports the Java API.
CEP DSLs and SQLs strongly resemble each other. CEP's additional features compared to SQL boil down to pattern detection. So It will be awesome to consolidate CEP and SQL. It makes SQL more powerful to support more usage scenario. And it gives users the ability to easily and quickly to build CEP applications.
Public Interfaces
The feature will support Row Pattern Recognition syntax in SQL. In December 2016, ISO released a new version of the international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing. Oracle's SQL has supported this feature in 12cR1. With this extension it is possible to detect complex patterns between rows in a table. In the context of CEP each row can be considered as an event and the events are ordered according to the insertion order. The following shows an example query.
SELECT *
FROM Ticker MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY tstamp
MEASURES STRT.tstamp AS start_tstamp,
LAST(DOWN.tstamp) AS bottom_tstamp,
LAST(UP.tstamp) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST UP
PATTERN (STRT DOWN+ UP+)
DEFINE
DOWN AS DOWN.price < PREV(DOWN.price),
UP AS UP.price > PREV(UP.price)
) MR
ORDER BY MR.symbol, MR.start_tstamp;
// input
SYMBOL TSTAMP PRICE
------- -------- ------
'ACME' '01-Apr-11' 12
'ACME' '02-Apr-11' 17
'ACME' '03-Apr-11' 19
'ACME' '04-Apr-11' 21
'ACME' '05-Apr-11' 25
'ACME' '06-Apr-11' 12
'ACME' '07-Apr-11' 15
'ACME' '08-Apr-11' 20
'ACME' '09-Apr-11' 24
'ACME' '10-Apr-11' 25
'ACME' '11-Apr-11' 19
// output
SYMBOL START_TST BOTTOM_TS END_TSTAM
---------- --------- --------- ---------
ACME 05-APR-11 06-APR-11 10-APR-11
Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks:
- Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses.
- Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define.
- Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause.
- Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause.
Proposed Changes
We should support MATCH_RECOGNIZE in Calcite first. Fortunately, there is an undergoing issue in Calcite to add MATCH_RECOGNIZE syntax, see CALCITE-1570. And the newest Calcite version 1.12.0 (which is also the version Flink master is dependent on) has included a basic implementation of MATCH_RECOGNIZE syntax, see 1.12.0 release notes. And the left features will be released in the next version. We might want to wait with CEP integration into flink-table until Calcite 1.13 is out and we updated the dependency though.
Gap between MATCH_RECOGNIZE and Flink-CEP
MATCH_RECOGNIZE is still an ongoing work in Calcite, the summary of its implementation status and its capacity compared with Flink-CEP is as follows:
PARTITION BY -- optional
- Syntax: Specify the partition columns.
- Flink-CEP: DataStream.keyBy()
- Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13
ORDER BY -- mandatory
- Syntax: Defines the order of the rows within a partition. Users need to explicitly order by processing or event time.
- Flink-CEP: can be automatically handled by Flink CEP
- Status in Calcite: Not supported in 1.12.0, will be supported in 1.13
MEASURES -- optional
- Syntax: Defines the exported columns.
- Flink-CEP: PatternStream.select(IterativeCondition)
- Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13
{ONE ROW | ALL ROWS} PER MATCH
- Syntax: ONE ROW PER MATCH: (Default) create a single summary row for each match
ALL ROWS PER MATCH: create one row for each row of each match - Flink-CEP: ONE ROW PER MATCH: PatternStream.select(PatternSelectFunction)
ALL ROWS PER MATCH: PatternStream.flatSelect(PatternFlatSelectFunction) - Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13
AFTER MATCH SKIP {TO NEXT ROW | PAST LAST ROW | TO FIRST <variable> | TO LAST <variable> | TO <variable>}
- Syntax: This syntax determines the starting point of the next match pattern after a match has been found. Default is AFTER MATCH SKIP PAST LAST ROW.
TO NEXT ROW: starting from the next row of the first row of the current match
PAST LAST ROW: starting from the next row after the last row the current match
TO FIRST <variable>: starting from the first row of the designated group
TO LAST <variable>: starting from the last row of the designated group
TO <variable>: similar to TO LAST <variable> - Flink-CEP: TO NEXT ROW: Supported by default in Flink-CEP
PAST LAST ROW: Depends on feature#4 in the “Required features” section
TO FIRST <variable>: Depends on feature#4 in the “Required features” section
TO LAST <variable>: Depends on feature#4 in the “Required features” section - Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13
PATTERN -- mandatory
- Syntax: Used to specify a regular expression, the regular expression is built from variable names defined the define clause.
- Flink-CEP: Partially supported in Pattern, more functionalities depend on feature#(1,2,3) In the “Required features” section
- Status-in-Calcite: Supported in 1.12.0
SUBSET -- optional
- Syntax: This syntax defines variables which are correlation variables defined in the pattern clause. This named variable can be used in the measures clause and define clause.
- Flink-CEP: Easy to implement
- Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13
DEFINE -- mandatory
- Syntax: Used to define the boolean condition that defines a variable name that is declared in the pattern.
- Flink-CEP: Can be supported with IterativeCondition
- Status-in-Calcite: Supported in 1.12.0
CLASSIFIER -- optional
- Syntax: This syntax is only available with ALL ROWS PER MATCH. It’s used to specify the variable name in the pattern that the row matched.
- Flink-CEP: Easy to implement
- Status-in-Calcite: Supported in 1.12.0
MATCH_NUMBER -- optional
- Syntax: This syntax allows the user to receive the sequential number of the current match.
- Flink-CEP: Easy to implement
- Status-in-Calcite: Supported in 1.12.0
WITHIN -- optional
- Syntax: Output a pattern match iff the match occurs within the specified time duration.
- Flink-CEP: Pattern.within(Time)
- Status-in-Calcite: Not supported yet
How to translate MATCH_RECOGNIZE to Flink-CEP
Currently, MATCH_RECOGNIZE in Calcite only support PATTERN and DEFINE. We will explain translation with a simple example.
SELECT *
FROM T MATCH_RECOGNIZE (
PATTERN (A B+ C)
DEFINE
A AS A.name = ‘a’,
B AS B.name = ‘b’,
C AS C.name = ‘c’
) MR
Giving the above simple SQL, Calcite will translate it to a relational tree:
LogicalProject[select=”id, name”]
LogicalMatch[pattern=”A B+ C”, definitions=”...”]
LogicalTableScan[table=”T”]
What we need to do is translating “LogicalMatch” node to calls on Flink-CEP APIs. The most important work is translating PATTERN, DEFINE, and MEASURE.
There are six kinds of PATTERNs:
- PATTERN_CONCAT: such as pattern A B is a pattern means that event B has to directly succeed the previous matching event A. This can be expressed by Flink-CEP API pattern.next(String).
- PATTERN_QUANTIFIER: such as pattern B{3, 5} is a pattern that matches between 3 and 5 occurrences of B. And B+ is a pattern that matches one or more occurrences of B. B+ can be expressed by CEP API pattern.oneOrMore(), B{n, n} can be expressed by pattern.times(n), B? Can be expressed by pattern.optional(). B* can be expressed as pattern.oneOrMore().optional(). But B{n, m} and B* are not supported in CEP API.
- PATTERN_ALTER: such as pattern A | B is a pattern that matches A or B. This can’t be expressed by CEP API currently. But I find a issue related to this feature. See FLINK-4641
- PATTERN_GROUPING: such as ((A B){3} C) attempts to match the group (A B) three times and then seeks one occurrence of C. NOT supported in CEP API currently.
- PATTERN_PERMUTE: such as PERMUTE (A, B) is a pattern that matches all permutations of A and B. NOT supported in CEP API currently.
- PATTERN_EXCLUDE: such as {- A -} is a pattern that matches A but excludes A from the output.
And we can translate DEFINE into CEP’s IterativeCondition by code generation, and apply it into pattern.where(...). Most of the definition can be translated by current CodeGenerator, besides some specific calls such as PREV, LAST, FIRST.
MEASURES can be translated into CEP’s PatternSelectFunction by code generation. The generation can heavily reuse the current CodeGenerator. NOTE: However, Calcite 1.12.0 doesn’t support MEASURES, so we don’t know what output do users want. As a workaround, in the propotype, we hardcode the PatternSelectFunction to only output the last event in the matched events.
As a result, the above example will be translated into CEP API like this:
val pattern = Pattern
.begin(“A”).where(_.name == “a”)
.next(“B”).oneOrMore().where(_.name == “b”)
.next(“C”).where(_.name == “c”)
val patternStream = CEP.pattern(partitionedInput, pattern)
val output = patternStream.select(PatternSelectFunction))
Required features in Flink-CEP
- Support .times(from, to) on Pattern
- Support branching CEP patterns. FLINK-4641
- Support permutation on Pattern
- Support .prune(long pruningTimestamp) on NFA
- IterativeCondition support open() method to compile generated code and not de/serialized with NFA state.
- Support pattern grouping in Flink CEP
Compatibility, Deprecation, and Migration Plan
This FLIP proposes new functionality and syntax for SQL and CEP library. The behavior of existing operators is not modified. To integrating CEP and SQL needs to add flink-cep dependency to flink-table. IMO, the flink-cep is a very slim library with almost none external dependencies, so it won't be a problem.
Test Plan
The features added to CEP library will be tested in flink-cep. The integration test will take place in flink-table.
Rejected Alternatives
No rejected alternatives yet.
Implementation Plan
The implementation of this effort can be divided into several subtasks. The following subtasks should wait until Calcite 1.13 released:
- Implementation of the basic framework to support CEP on SQL. This includes supporting PATTERN and DEFINE clauses for MATCH_RECOGNIZE. see prototype
- Support MEASURES clause in MATCH_RECOGNIZE
- Support AFTER MATCH sub-clause of MATCH_RECOGNIZE
- Support SUBSET clause in MATCH_RECOGNIZE
- Support ALL ROWS PER MATCH for MATCH_RECOGNIZE
- Support PARTITION BY for MATCH_RECOGNIZE
- Support ORDER BY for MATCH_RECOGNIZE
- Support CLASSIFIER for MATCH_RECOGNIZE
- Support MATCH_NUMBER for MATCH_RECOGNIZE
- Support WITHIN clause for MATCH_RECOGNIZE
- Support LAST, FIRST, PREV, NEXT, FINAL, RUNNING aggregates for MATCH_RECOGNIZE
The following subtasks are about the missing features of the CEP library and can start working right away:
- Support .times(from, to) in Flink CEP Pattern
- Support branching CEP patterns in Flink CEP . FLINK-4641
- Support permutation in Flink CEP Pattern.
- Support .prune(long pruningTimestamp) on Flink CEP NFA.
- IterativeCondition support open() method to compile generated code and not de/serialized with NFA state.
- Support pattern grouping in Flink CEP