Page properties | ||||||||
---|---|---|---|---|---|---|---|---|
|
Table of Contents |
---|
Motivation
Table API is a unified, relational API for streaming and batch processing which shares the same underlying query optimization and query execution stack with SQL. As more and more features and functionalities are added in SQL, Table API is also becoming more and more powerful as most optimizations and functionalities are shared between them.
...
- Make Table API users to easily leverage the powerful features in SQL, e.g. deduplication, topn, etc
- Provide convenient frequently used functionalities, e.g. introducing a series of methods for null data handling (it will become a problem if there are hundreds of columns) and data sampling and splitting (it is very common in ML to split a data set into multiple splits separately for training and validation)
Public Interfaces
...
Align SQL functionalities
deduplicate
Specification:
...
SELECT a, b, c, d, e FROM ( SELECT a, b, c, d, e ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY c asc, d desc) AS rownum FROM table_name |
asof
Specification:
Table asOf(Expression expr) |
Description:
Return a table representing the state of the given time point.
Example 1 (event time temporal join):
Java: |
It’s equivalent to the following SQL:
|
It’s equivalent to the following SQL:
SELECT
order_id,
price,
currency,
SELECT order_id, price, currency, |
conversion_rate, order_time, FROM orders |
JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time ON orders.currency = currency_rates.currency |
Example 2 (processing time temporal join):
Java: |
It’s equivalent to the following SQL:
|
It’s equivalent to the following SQL:
SELECT
SELECT |
order_id, price, currency, conversion_rate, order_time, FROM orders |
JOIN currency_rates FOR SYSTEM TIME AS OF orders.proctime ON orders.currency = currency_rates.currency |
Note: The `$("order_time")` of Example 1 and `$("proctime")` of Example 2 refers to columns from the left table.
window
Specification:
Table window(TableValuedWindow tableValuedWindow) |
Description:
Apply windowing TVF to the table.
We need to introduce class org.apache.flink.table.api.TableValuedWindow to represent a windowing table-value function.
Regarding to how to define windowing table-value function, we want to reuse the existing classes as much as possible:
...
Example 1 (apply tumble windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); |
Example 2 (apply sliding windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)); |
Example 3 (apply session windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '10' MINUTES)); |
Example 4 (apply cumulate windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)); |
Example 5 (window aggregation):
bidTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("bidtime"))) |
It’s equivalent to the following SQL:
SELECT window_start, window_end, SUM(price) FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; |
Example 6 (window join):
left = leftTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))
left.col("window_end").isEqual(right.col("window_end")))) |
It’s equivalent to the following SQL:
CREATE TEMPORARY VIEW left AS TUMBLE(TABLE left_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); TUMBLE(TABLE right_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); FROM left JOIN right ON left.window_start = right.window_start AND left.window_end = right.window_end |
NOTE: Both the left table and the right table have columns named "window_start" and "window_end". Currently it still doesn’t support joining two tables with the same column names. See https://issues.apache.org/jira/browse/FLINK-18679 for more details.
Example 7 (window topn):
windowTable = table.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime"))) |
It’s equivalent to the following SQL:
CREATE TEMPORARY VIEW T AS TUMBLE(TABLE table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, col1 ORDER BY col2 asc, col3 desc) AS rownum FROM T) WHERE rownum <= 3 |
...
hint
Specification:
Table hint(String name, Map<String, String> options) |
Description:
Describe the schema of a table or a view.
Example 1:
kafkaTable. |
hint("OPTIONS", ImmutableMap.of("scan.startup.mode", "earliest-offset")) |
It’s equivalent to the following SQL:
select id, name from kafka_table /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; |
describe
Specification:
TableResult describe() |
...
SELECT |
Sampling
sample
API Specification:
Table sample(double fraction) |
Description:
Take a sample of the table according to the given fraction([0.0, 1.0]).
Example:
table.sample(0.1) |
It’s equivalent to the following SQL:
SELECT |
split
API Specification:
Table[] split(double[] weights) |
Description:
Splits the table into multiple sub-tables according to the given weights.
Example:
table.split(new double[] { 0.1, 0.2, 0.3 }) |
It’s logically equivalent to the following SQL:
CREATE VIEW TT AS |