Status
Current state: ["Under Discussion"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
...
Table of Contents |
---|
Motivation
Table API is a unified, relational API for streaming and batch processing which shares the same underlying query optimization and query execution stack with SQL. As more and more features and functionalities are added in SQL, Table API is also becoming more and more powerful as most optimizations and functionalities are shared between them.
...
- Make Table API users to easily leverage the powerful features in SQL, e.g. deduplication, topn, etc
- Provide convenient frequently used functionalities, e.g. introducing a series of methods for null data handling (it will become a problem if there are hundreds of columns) and data sampling and splitting (it is very common in ML to split a data set into multiple splits separately for training and validation)
Public Interfaces
Table Operations
...
deduplicate
Specification:
Table |
deduplicate(Expression[] dedupFields, Expression orderField, boolean keepFirst) |
Description:
Deduplication according to the specified deduplication fields.
- param dedupFields: the fields to perform deduplication against
- param orderField: the time attribute field, it defines the order of duplicate rows
- param keepFirst: Determines which row to keep. If true, keep the first occurrence and drop the later ones, otherwise, keep the last occurrence.
Example:
Java: |
deduplicate(new Expression[] {$("a"), $("b")}, $("proctime"), true) |
It’s equivalent to the following SQL:
SELECT a, b, c FROM ( SELECT a, b, c, ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY proctime asc) AS rownum FROM table_name |
topn
Specification:
Table topn(int n, Expression[] partitionFields, Expression[] orderFields) |
Description:
Returns the largest/smallest n elements ordered by the specified orderFields for each group specified by the partitionFields.
- param n: the number of elements to return for each partition
- param partitionFields: the partition fields, each partition will have a top-n result
- param orderFields: the order fields, the ordering direction could be different for different columns
Example:
Java: |
) |
It’s equivalent to the following SQL:
SELECT a, b, c, d, e FROM ( SELECT a, b, c, d, e ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY c asc, d desc) AS rownum FROM table_name |
describe
Specification:
TableResult describe() |
Description:
Describe the schema of a table or a view.
Example:
table.describe().print() |
It’s equivalent to the following SQL:
DESCRIBE orders |
Null Data Handling
fillna
API Specification:
Table fillna(Object value) |
Description:
Replace the specified fields with the specified value if they are null.
- param value: the value to fill with
- param fields: it will only replace the specified fields.
- param values: the map key is the name of the field to replace and the map value is the value to replace with.
Example 1:
Java: |
It’s equivalent to the following SQL:
SELECT (case when a is null then 1 else a end) as a, |
Example 2:
Java: |
It’s equivalent to the following SQL:
SELECT (case when a is null then 1 else a end) as a, |
dropna
API Specification:
Table dropna(int thresh) |
Description:
Remove one row if the number of null fields exceeds the specified threshold.
- param thresh: remove a row if the number of non-null columns is less than the threshold. If it is
-1, a row is removed only when all of the columns are null.
- param fields: it will only consider the specified fields
Example 1:
Java: |
It’s equivalent to the following SQL:
SELECT a, b, c a, b, |
Example 2:
Java: |
It’s equivalent to the following SQL:
SELECT a, b, c a, b, |
replace
API Specification:
Table replace(Map<T, T> replacement, String[] fields) |
Description:
For the given fields, replace the values which are among the key of the replacement with the corresponding value.
- param replacement: the map key is the value to replace and map value is the value to replace with.
- param fields: the columns to consider
Example:
Java: |
It’s equivalent to the following SQL:
SELECT |
c
FROM T
Sampling
sample
API Specification:
...
Example:
table.sample(0.1) |
It’s equivalent to the following SQL:
, |
...
|
FROM T
WHERE RAND() < 0.1
split
API Specification:
...
Description:
Splits the table into multiple sub-tables according to the given weights.
Example:
table.split(new double[] { 0.1, 0.2, 0.3 }) |
It’s logically equivalent to the following SQL:
SELECT
|
c |
|
CREATE VIEW TT1 AS
SELECT
a, b, c
FROM TT
WHERE d < 0.1/(0.1 + 0.2 + 0.3)
CREATE VIEW TT2 AS
SELECT
a, b, c
FROM TT
WHERE d >= 0.1/(0.1 + 0.2 + 0.3) and d < 0.2/(0.1 + 0.2 + 0.3)
CREATE VIEW TT3 AS
SELECT
a, b, c
FROM TT
WHERE d >= 0.2/(0.1 + 0.2 + 0.3)
NOTE: The seed for all the RAND should be the same to make sure that the random value is the same for one element. This is to make sure that one element belongs to only one sub-table(e.g. TT1, TT2, TT3).
Compatibility, Deprecation, and Migration Plan
N/A: As all the methods are newly added, there are no compatibility issues.
Test Plan
N/A
Rejected Alternatives
...