...
deduplicate
Specification:
Table deduplicate(Expression[] dedupFields, Expression orderField, boolean keepFirst) |
Description:
Deduplication according to the specified deduplication fields.
- param dedupFields: the fields to perform deduplication against
- param orderField: the time attribute field, it defines the order of duplicate rows
- param keepFirst: Determines which row to keep. If true, keep the first occurrence and drop the later ones, otherwise, keep the last occurrence.
Example:
Java: |
It’s equivalent to the following SQL:
SELECT a, b, c FROM ( SELECT a, b, c, ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY proctime asc) AS rownum FROM table_name |
topn
Specification:
Table topn(int n, Expression[] partitionFields, Expression[] orderFields) |
Description:
Returns the largest/smallest n elements ordered by the specified orderFields for each group specified by the partitionFields.
- param n: the number of elements to return for each partition
- param partitionFields: the partition fields, each partition will have a top-n result
- param orderFields: the order fields, the ordering direction could be different for different columns
Example:
Java: |
It’s equivalent to the following SQL:
SELECT a, b, c, d, e FROM ( SELECT a, b, c, d, e ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY c asc, d desc) AS rownum FROM table_name |
asof
Specification:
Table asOf(Expression expr) |
Description:
Return a table representing the state of the given time point.
Example 1 (event time temporal join):
leftTable.join(rightTable.asOf($("order_time")), $("currency").isEqual("currency")) |
It’s equivalent to the following SQL:
SELECT order_id, price, currency, conversion_rate, order_time, FROM orders LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time ON orders.currency = currency_rates.currency |
Example 2 (processing time temporal join):
leftTable.join(rightTable.asOf($("proctime")), $("currency").isEqual("currency")) |
It’s equivalent to the following SQL:
SELECT order_id, price, currency, conversion_rate, order_time, FROM orders LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.proctime ON orders.currency = currency_rates.currency |
Note: The `$("order_time")` of Example 1 and `$("proctime")` of Example 2 refers to columns from the left table.
window
Specification:
Table window(TableValuedWindow tableValuedWindow) |
Description:
Apply windowing TVF to the table.
We need to introduce class org.apache.flink.table.api.TableValuedWindow to represent a windowing table-value function.
Regarding to how to define windowing table-value function, we want to reuse the existing classes as much as possible:
- For sliding window and tumble window, SlideWithSizeAndSlideOnTime, TumbleWithSizeOnTime will extend from org.apache.flink.table.api.TableValuedWindow as there is no alias in table-valued window.
- For the session window, as there is a keyCols in session windowing table-value function, we will introduce SessionWithGapAndKey and SessionWithGapAndKeyOnTime under package org.apache.flink.table.api. SessionWithGapAndKeyOnTime extends org.apache.flink.table.api.TableValuedWindow.
- For the cumulate window, we will introduce Cumulate, CumulateWithSize, CumulateWithSizeAndStep, CumulateWithSizeAndStepOnTime under package org.apache.flink.table.api. CumulateWithSizeAndStepOnTime extends org.apache.flink.table.api.TableValuedWindow.
Example 1 (apply tumble windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); |
Example 2 (apply sliding windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)); |
Example 3 (apply session windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '10' MINUTES)); |
Example 4 (apply cumulate windowing TVF to a table):
bidTable.window( |
It’s equivalent to the following SQL:
SELECT * FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)); |
Example 5 (window aggregation):
bidTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("bidtime"))) |
It’s equivalent to the following SQL:
SELECT window_start, window_end, SUM(price) FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; |
Example 6 (window join):
left = leftTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))
left.col("window_end").isEqual(right.col("window_end")))) |
It’s equivalent to the following SQL:
CREATE TEMPORARY VIEW left AS TUMBLE(TABLE left_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); TUMBLE(TABLE right_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); FROM left JOIN right ON left.window_start = right.window_start AND left.window_end = right.window_end |
NOTE: Both the left table and the right table have columns named "window_start" and "window_end". Currently it still doesn’t support joining two tables with the same column names. See https://issues.apache.org/jira/browse/FLINK-18679 for more details.
Example 7 (window topn):
windowTable = table.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime"))) |
It’s equivalent to the following SQL:
CREATE TEMPORARY VIEW T AS TUMBLE(TABLE table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)); FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, col1 ORDER BY col2 asc, col3 desc) AS rownum FROM T) WHERE rownum <= 3 |
hints
Specification:
Table hint(String name, Map<String, String> options) |
Description:
Describe the schema of a table or a view.
Example 1:
kafkaTable.hints("OPTIONS", ImmutableMap.of("scan.startup.mode", "earliest-offset")) |
It’s equivalent to the following SQL:
select id, name from kafka_table /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; |
describe
Specification:
TableResult describe() |
Description:
Describe the schema of a table or a view.
Example:
table.describe().print() |
It’s equivalent to the following SQL:
DESCRIBE orders |
Null Data Handling
fillna
API Specification:
Table fillna(Object value) |
Description:
Replace the specified fields with the specified value if they are null.
- param value: the value to fill with
- param fields: it will only replace the specified fields.
- param values: the map key is the name of the field to replace and the map value is the value to replace with.
Example 1:
Java: |
It’s equivalent to the following SQL:
SELECT (case when a is null then 1 else a end) as a, |
Example 2:
Java: |
It’s equivalent to the following SQL:
SELECT (case when a is null then 1 else a end) as a, |
dropna
API Specification:
Table dropna(int thresh) |
Description:
Remove one row if the number of null fields exceeds the specified threshold.
- param thresh: remove a row if the number of non-null columns is less than the threshold. If it is
-1, a row is removed only when all of the columns are null.
- param fields: it will only consider the specified fields
Example 1:
Java: |
It’s equivalent to the following SQL:
SELECT a, b, c a, b, |
Example 2:
Java: |
It’s equivalent to the following SQL:
SELECT a, b, c a, b, |
replace
API Specification:
Table replace(Map<T, T> replacement, String[] fields) |
Description:
For the given fields, replace the values which are among the key of the replacement with the corresponding value.
- param replacement: the map key is the value to replace and map value is the value to replace with.
- param fields: the columns to consider
Example:
Java: |
It’s equivalent to the following SQL:
SELECT |