Motivation

Table API is a unified, relational API for streaming and batch processing which shares the same underlying query optimization and query execution stack with SQL. As more and more features and functionalities are added in SQL, Table API is also becoming more and more powerful as most optimizations and functionalities are shared between them. 

For the Table API itself, the community is also consistently improving it, including but not limited to the following:

  • We have supported several row-based operations, e.g. map/flatMap/aggregate/flatAggregate in FLIP-29 and column-based operations, e.g. addColumns/renameColumns/dropColumns/addOrReplaceColumns in Flink 1.9
  • Expression DSL has been introduced in the Table API in FLIP-55 which improves the usability a lot.
  • In FLIP-129, it has proposed to refactor the existing Descriptor API to fill the functionality gap between Descriptor API and SQL DDL.
  • Python Table API is also supported since Flink 1.9 which allows users to write Table API programs in Python language.


In this FLIP, we’d like to continuously improve the Table API by introducing a few new methods as currently there are a few problems for the Table API: some tasks are not easy to express with Table API e.g. deduplication, topn, etc, or not easy to express when there are hundreds of columns in a table, e.g. null data handling, etc

The proposed changes could be summarized as the following categories:

  • Make Table API users to easily leverage the powerful features in SQL, e.g. deduplication, topn, etc
  • Provide convenient frequently used functionalities, e.g. introducing a series of methods for null data handling (it will become a problem if there are hundreds of columns) and data sampling and splitting (it is very common in ML to split a data set into multiple splits separately for training and validation)

Public Interfaces

Align SQL functionalities

deduplicate

Specification:

Table deduplicate(Expression[] dedupFields, Expression orderField, boolean keepFirst)


Description:
Deduplication according to the specified deduplication fields.
- param dedupFields: the fields to perform deduplication against
- param orderField: the time attribute field, it defines the order of duplicate rows
- param keepFirst: Determines which row to keep. If true, keep the first occurrence and drop the later ones, otherwise, keep the last occurrence. 

Example:

Java:
table.deduplicate(new Expression[] {$("a"), $("b")}, $("proctime"), true)

Python:
table.deduplicate([table.a, table.b], table.proctime, True)


It’s equivalent to the following SQL:

SELECT a, b, c

FROM (

   SELECT a, b, c,

     ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY proctime asc) AS rownum

   FROM table_name
) WHERE rownum = 1

topn

Specification:

Table topn(int n, Expression[] partitionFields, Expression[] orderFields)


Description:
Returns the largest/smallest n elements ordered by the specified orderFields for each group specified by the partitionFields.
- param n: the number of elements to return for each partition
- param partitionFields: the partition fields, each partition will have a top-n result
- param orderFields: the order fields, the ordering direction could be different for different columns

Example:

Java:
Expression[] partitionFields = new Expression[] {$("a"), $("b")};

Expression[] orderFields = new Expression[] { $("c"), ($("d").desc() };
table.topn(3, partitionFields, orderFields)

Python:
table.topn(3, [table.a, table.b], [table.c, table.d.desc])


It’s equivalent to the following SQL:

SELECT a, b, c, d, e

FROM (

   SELECT a, b, c, d, e

     ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY c asc, d desc) AS rownum

   FROM table_name
) WHERE rownum = 3

asof

Specification:

Table asOf(Expression expr)

Description:
Return a table representing the state of the given time point.

Example 1 (event time temporal join):

Java:
leftTable.join(rightTable.asOf($("order_time")), $("currency").isEqual("currency"))

Python:
left_table.join(right_table.as_of(left_table.order_time), left_table.currency == right_table.currency)

It’s equivalent to the following SQL:

SELECT 

     order_id,

     price,

     currency,

     conversion_rate,

     order_time,

FROM orders

JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time

ON orders.currency = currency_rates.currency


Example 2 (processing time temporal join):

Java:
leftTable.join(rightTable.asOf($("proctime")), $("currency").isEqual("currency"))

Python:
left_table.join(right_table.as_of(left_table.proctime), left_table.currency == right_table.currency)

It’s equivalent to the following SQL:

SELECT 

     order_id,

     price,

     currency,

     conversion_rate,

     order_time,

FROM orders

JOIN currency_rates FOR SYSTEM TIME AS OF orders.proctime

ON orders.currency = currency_rates.currency


Note: The `$("order_time")` of Example 1 and `$("proctime")` of Example 2 refers to columns from the left table.

window

Specification:

Table window(TableValuedWindow tableValuedWindow)

Description:
Apply windowing TVF to the table.

We need to introduce class org.apache.flink.table.api.TableValuedWindow to represent a windowing table-value function.

Regarding to how to define windowing table-value function, we want to reuse the existing classes as much as possible:

  • For sliding window and tumble window, SlideWithSizeAndSlideOnTime, TumbleWithSizeOnTime will extend from org.apache.flink.table.api.TableValuedWindow as there is no alias in table-valued window.
  • For the session window, as there is a keyCols in session windowing table-value function, we will introduce SessionWithGapAndKey and SessionWithGapAndKeyOnTime under package org.apache.flink.table.api. SessionWithGapAndKeyOnTime extends org.apache.flink.table.api.TableValuedWindow.
  • For the cumulate window, we will introduce Cumulate, CumulateWithSize, CumulateWithSizeAndStep, CumulateWithSizeAndStepOnTime under package org.apache.flink.table.api. CumulateWithSizeAndStepOnTime extends org.apache.flink.table.api.TableValuedWindow.

Example 1 (apply tumble windowing TVF to a table):

bidTable.window(
      Tumble.over(Expressions.lit(10).minutes())
                  .on($("bidtime")))

It’s equivalent to the following SQL:

SELECT * FROM TABLE(

   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));


Example 2 (apply sliding windowing TVF to a table):

bidTable.window(
      Slide.over(Expressions.lit(10).minutes())
              .every(Expressions.lit(5).minutes())
              .on($("bidtime")))

It’s equivalent to the following SQL:

SELECT * FROM TABLE(

    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));


Example 3 (apply session windowing TVF to a table):

bidTable.window(
      Session.withGap(Expressions.lit(10).minutes())
                  .partitionBy($("bidder"))
                  .on($("bidtime")))

It’s equivalent to the following SQL:

SELECT * FROM TABLE(

   SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '10' MINUTES));


Example 4 (apply cumulate windowing TVF to a table):

bidTable.window(
      Cumulate.over(Expressions.lit(10).minutes())
                    .step(Expressions.lit(2).minutes())
                    .on($("bidtime")))

It’s equivalent to the following SQL:

SELECT * FROM TABLE(

    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));


Example 5 (window aggregation):

bidTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("bidtime")))
            .groupBy($("window_start"), $("window_end"))
            .select($("window_start"), $("window_end"), $("price").sum())

It’s equivalent to the following SQL:

SELECT window_start, window_end, SUM(price)

  FROM TABLE(

    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))

  GROUP BY window_start, window_end;


Example 6 (window join):

left = leftTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))


right = rightTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))

left.join(right,
            left.col("window_start").isEqual(right.col("window_start")).and(

            left.col("window_end").isEqual(right.col("window_end"))))

It’s equivalent to the following SQL:

CREATE TEMPORARY VIEW left AS
SELECT * FROM TABLE(

   TUMBLE(TABLE left_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES));

CREATE TEMPORARY VIEW right AS
SELECT * FROM TABLE(

   TUMBLE(TABLE right_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES));

SELECT *

FROM left JOIN right

ON left.window_start = right.window_start AND left.window_end = right.window_end

NOTE: Both the left table and the right table have columns named "window_start" and "window_end". Currently it still doesn’t support joining two tables with the same column names. See https://issues.apache.org/jira/browse/FLINK-18679 for more details.


Example 7 (window topn):

windowTable = table.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))

Expression[] partitionFields = new Expression[] { $("window_start"), $("window_end"), $("col1") }
Expression[] orderFields = new Expression[] { $("col2"), ($("col3").desc() };
windowTable.topn(3, partitionFields, orderFields)

It’s equivalent to the following SQL:

CREATE TEMPORARY VIEW T AS
SELECT * FROM TABLE(

   TUMBLE(TABLE table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES));

SELECT *

FROM (

   SELECT *,

     ROW_NUMBER() OVER (PARTITION BY window_start, window_end, col1

       ORDER BY col2 asc, col3 desc) AS rownum

   FROM T)

WHERE rownum <= 3

hint

Specification:

Table hint(String name, Map<String, String> options)

// The hint options defined in RelHint supports both List and Map. Currently it only uses Map in Flink. So there is no need to add the following interface for now. We could introduce it when we support List in Flink.
Table hint(String name, String... parameters)


Description:

Describe the schema of a table or a view.

Example 1:

kafkaTable.hint("OPTIONS", ImmutableMap.of("scan.startup.mode", "earliest-offset"))
                .select($("id"), $("name"))

It’s equivalent to the following SQL:

select id, name from kafka_table /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

describe

Specification:

TableResult describe()


Description:
Describe the schema of a table or a view.

Example:

table.describe().print()


It’s equivalent to the following SQL:

DESCRIBE orders

Null Data Handling

fillna

API Specification:

Table fillna(Object value)

Table fillna(Object value, String[] fields)

Table fillna(Map<String, Object> values)


Description:
Replace the specified fields with the specified value if they are null. 

- param value: the value to fill with
- param fields: it will only replace the specified fields.
- param values: the map key is the name of the field to replace and the map value is the value to replace with.

Example 1:

Java:
table.fillna(1, new String[] {"a", "b"})

Python:
table.fillna(1, ['a', 'b'])


It’s equivalent to the following SQL:

SELECT

    (case when a is null then 1 else a end) as a,
    (case when b is null then 1 else b end) as b,
    c
FROM T


Example 2:

Java:
table.fillna(ImmutableMap.of("a", 1, "b", 3))

Python:
table.fillna({"a":1, "b": 3})


It’s equivalent to the following SQL:

SELECT

    (case when a is null then 1 else a end) as a,
    (case when b is null then 3 else b end) as b,
    c
FROM T

dropna

API Specification:

Table dropna(int thresh)

Table dropna(int thresh, String[] fields)


Description:
Remove one row if the number of null fields exceeds the specified threshold.
- param thresh: remove a row if the number of non-null columns is less than the threshold. If it is      

                          -1, a row is removed only when all of the columns are null.
- param fields: it will only consider the specified fields


Example 1:

Java:
table.dropna(2)

Python:
table.dropna(2)


It’s equivalent to the following SQL:

SELECT a, b, c
FROM (
    SELECT

      a,

      b,
      c,
      (case when a is not null then 1 else 0 end) + (case when b is not null then 1 else 0 end) + (case when c is not null then 1 else 0 end) as cnt
    FROM T
)
WHERE cnt < 2


Example 2:

Java:
table.dropna(1, new String[] {"a", "b"})

Python:
table.dropna(1, ['a', 'b'])


It’s equivalent to the following SQL:

SELECT a, b, c
FROM (
    SELECT

      a,

      b,
      c,
      (case when a is not null then 1 else 0 end) + (case when b is not null then 1 else 0 end) as cnt
    FROM T
)
WHERE cnt < 1

replace

API Specification:

Table replace(Map<T, T> replacement, String[] fields)


Description:
For the given fields, replace the values which are among the key of the replacement with the corresponding value.
- param replacement: the map key is the value to replace and map value is the value to replace with.
- param fields: the columns to consider

Example:

Java:
table.replace(ImmutableMap.of(1, 3, 2, 4), new String[] {"a", "b"})

Python:
table.replace({1: 3, 2: 4}, ['a', 'b'])


It’s equivalent to the following SQL:

SELECT
    (case when a = 1 then 3
            when a = 2 then 4
      else a end) as a,
    (case when b = 1 then 3
            when b = 2 then 4
      else b end) as b,
    c
FROM T

Sampling

sample

API Specification:

Table sample(double fraction)

Table sample(double fraction, long seed)


Description:

Take a sample of the table according to the given fraction([0.0, 1.0]). 

Example:

table.sample(0.1)

It’s equivalent to the following SQL:

SELECT
  a, b, c
FROM T
WHERE RAND() < 0.1

split

API Specification:

Table[] split(double[] weights)

Table[] split(double[] weights, long seed)

Description:
Splits the table into multiple sub-tables according to the given weights.

Example:

table.split(new double[] { 0.1, 0.2, 0.3 })

It’s logically equivalent to the following SQL:

CREATE VIEW TT AS
SELECT
  a, b, c, RAND(100) as d
FROM T

CREATE VIEW TT1 AS
SELECT
  a, b, c
FROM TT
WHERE d < 0.1/(0.1 + 0.2 + 0.3)

CREATE VIEW TT2 AS
SELECT
  a, b, c
FROM TT
WHERE d >= 0.1/(0.1 + 0.2 + 0.3) and d < 0.2/(0.1 + 0.2 + 0.3)

CREATE VIEW TT3 AS
SELECT
  a, b, c
FROM TT
WHERE d >= 0.2/(0.1 + 0.2 + 0.3)

NOTE: The seed for all the RAND should be the same to make sure that the random value is the same for one element. This is to make sure that one element belongs to only one sub-table(e.g. TT1, TT2, TT3).