Page properties | ||||||||
---|---|---|---|---|---|---|---|---|
|
Table of Contents |
---|
Motivation
...
- Make Table API users to easily leverage the powerful features in SQL, e.g. deduplication, topn, etc
- Provide convenient frequently used functionalities, e.g. introducing a series of methods for null data handling (it will become a problem if there are hundreds of columns) and data sampling and splitting (it is very common in ML to split a data set into multiple splits separately for training and validation)
Public Interfaces
...
Align SQL functionalities
deduplicate
Specification:
...
SELECT a, b, c, d, e FROM ( SELECT a, b, c, d, e ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY c asc, d desc) AS rownum FROM table_name |
asof
Specification:
Table asOf(Expression expr) |
Description:
Return a table representing the state of the given time point.
Example 1 (event time temporal join):
Java: |
...
|
It’s equivalent to the following SQL:
SELECT order_id, price, currency, conversion_ |
SELECT
order_id,
price,
currency,
conversion_rate, order_time, FROM orders |
JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time ON orders.currency = currency_rates.currency |
Example 2 (processing time temporal join):
Java: |
|
It’s equivalent to It’s equivalent to the following SQL:
SELECT order_id, price, currency, conversion_rate, order_time, FROM orders |
JOIN currency_rates FOR SYSTEM TIME AS OF orders.proctime ON orders.currency = currency_rates.currency |
Note: The `$("order_time")` of Example 1 and `$("proctime")` of Example 2 refers to columns from the left table.
window
Specification:
Table window(TableValuedWindow tableValuedWindow) |
Description:
Apply windowing TVF to the table.
We need to introduce class org.apache.flink.table.api.TableValuedWindow to represent a windowing table-value function.
Regarding to how to define windowing table-value function, we want to reuse the existing classes as much as possible:
...
Example 1 (apply tumble windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); |
Example 2 (apply sliding windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)); |
Example 3 (apply session windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '10' MINUTES)); |
Example 4 (apply cumulate windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)); |
Example 5 (window aggregation):
bidTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("bidtime"))) |
It’s equivalent to the following SQL:
SELECT window_start, window_end, SUM(price) FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; |
Example 6 (window join):
left = leftTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))
left.col("window_end").isEqual(right.col("window_end")))) |
It’s equivalent to the following SQL:
CREATE TEMPORARY VIEW left AS TUMBLE(TABLE left_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); TUMBLE(TABLE right_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); FROM left JOIN right ON left.window_start = right.window_start AND left.window_end = right.window_end |
NOTE: Both the left table and the right table have columns named "window_start" and "window_end". Currently it still doesn’t support joining two tables with the same column names. See https://issues.apache.org/jira/browse/FLINK-18679 for more details.
Example 7 (window topn):
windowTable = table.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime"))) |
It’s equivalent to the following SQL:
CREATE TEMPORARY VIEW T AS TUMBLE(TABLE table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, col1 ORDER BY col2 asc, col3 desc) AS rownum FROM T) WHERE rownum <= 3 |
...
hint
Specification:
Table hint(String name, Map<String, String> options) |
Description:
Describe the schema of a table or a view.
Example 1:
kafkaTable. |
hint("OPTIONS", ImmutableMap.of("scan.startup.mode", "earliest-offset")) |
It’s equivalent to the following SQL:
select id, name from kafka_table /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; |
describe
Specification:
TableResult describe() |
...
SELECT |
Sampling
sample
API Specification:
Table sample(double fraction) |
Description:
Take a sample of the table according to the given fraction([0.0, 1.0]).
Example:
table.sample(0.1) |
It’s equivalent to the following SQL:
SELECT |
split
API Specification:
Table[] split(double[] weights) |
Description:
Splits the table into multiple sub-tables according to the given weights.
Example:
table.split(new double[] { 0.1, 0.2, 0.3 }) |
It’s logically equivalent to the following SQL:
CREATE VIEW TT AS |