Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

SELECT a, b, c, d, e

FROM (

   SELECT a, b, c, d, e

     ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY c asc, d desc) AS rownum

   FROM table_name
) WHERE rownum = 3

asof

Specification:

Table asOf(Expression expr)

Description:
Return a table representing the state of the given time point.

Example 1 (event time temporal join):

Java:
leftTable.join(rightTable.asOf($("order_time")), $("currency").isEqual("currency"))

Python:
left_table.join(right_table.as_of(left_table.order_time), left.currency == right.currency)

It’s equivalent to the following SQL:

SELECT 

     order_id,

     price,

     currency,

     conversion_rate,

     order_time,

FROM orders

LEFT

JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time

ON orders.currency = currency_rates.currency


Example 2 (processing time temporal join):

Java:
leftTable.join(rightTable.asOf($("proctime")), $("currency").isEqual("currency"))

Python:
left_table.join(right_table.as_of(left_table.proctime), left.currency == right.currency)

It’s equivalent to the following SQL:

SELECT 

     order_id,

     price,

     currency,

     conversion_rate,

     order_time,

FROM orders

LEFT

JOIN currency_rates FOR SYSTEM TIME AS OF orders.proctime

ON orders.currency = currency_rates.currency


Note: The `$("order_time")` of Example 1 and `$("proctime")` of Example 2 refers to columns from the left table.

window

Specification:

Table window(TableValuedWindow tableValuedWindow)

Description:
Apply windowing TVF to the table.

We need to introduce class org.apache.flink.table.api.TableValuedWindow to represent a windowing table-value function.

Regarding to how to define windowing table-value function, we want to reuse the existing classes as much as possible:

...

Example 1 (apply tumble windowing TVF to a table):

bidTable.window(
      Tumble.over(Expressions.lit(10).minutes())
                  .on($("bidtime")))

It’s equivalent to the following SQL:

SELECT * FROM TABLE(

   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));


Example 2 (apply sliding windowing TVF to a table):

bidTable.window(
      Slide.over(Expressions.lit(10).minutes())
              .every(Expressions.lit(5).minutes())
              .on($("bidtime")))

It’s equivalent to the following SQL:

SELECT * FROM TABLE(

    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));


Example 3 (apply session windowing TVF to a table):

bidTable.window(
      Session.withGap(Expressions.lit(10).minutes())
                  .partitionBy($("bidder"))
                  .on($("bidtime")))

It’s equivalent to the following SQL:

SELECT * FROM TABLE(

   SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '10' MINUTES));


Example 4 (apply cumulate windowing TVF to a table):

bidTable.window(
      Cumulate.over(Expressions.lit(10).minutes())
                    .step(Expressions.lit(2).minutes())
                    .on($("bidtime")))

It’s equivalent to the following SQL:

SELECT * FROM TABLE(

    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));


Example 5 (window aggregation):

bidTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("bidtime")))
            .groupBy($("window_start"), $("window_end"))
            .select($("window_start"), $("window_end"), $("price").sum())

It’s equivalent to the following SQL:

SELECT window_start, window_end, SUM(price)

  FROM TABLE(

    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))

  GROUP BY window_start, window_end;


Example 6 (window join):

left = leftTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))


right = rightTable.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))

left.join(right,
            left.col("window_start").isEqual(right.col("window_start")).and(

            left.col("window_end").isEqual(right.col("window_end"))))

It’s equivalent to the following SQL:

CREATE TEMPORARY VIEW left AS
SELECT * FROM TABLE(

   TUMBLE(TABLE left_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES));

CREATE TEMPORARY VIEW right AS
SELECT * FROM TABLE(

   TUMBLE(TABLE right_table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES));

SELECT *

FROM left JOIN right

ON left.window_start = right.window_start AND left.window_end = right.window_end

NOTE: Both the left table and the right table have columns named "window_start" and "window_end". Currently it still doesn’t support joining two tables with the same column names. See https://issues.apache.org/jira/browse/FLINK-18679 for more details.


Example 7 (window topn):

windowTable = table.window(Tumble.over(Expressions.lit(10).minutes()).on($("rowtime")))

Expression[] partitionFields = new Expression[] { $("window_start"), $("window_end"), $("col1") }
Expression[] orderFields = new Expression[] { $("col2"), ($("col3").desc() };
windowTable.topn(3, partitionFields, orderFields)

It’s equivalent to the following SQL:

CREATE TEMPORARY VIEW T AS
SELECT * FROM TABLE(

   TUMBLE(TABLE table, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES));

SELECT *

FROM (

   SELECT *,

     ROW_NUMBER() OVER (PARTITION BY window_start, window_end, col1

       ORDER BY col2 asc, col3 desc) AS rownum

   FROM T)

WHERE rownum <= 3

hints

Specification:

Table hint(String name, Map<String, String> options)

// The hint options defined in RelHint supports both List and Map. Currently it only uses Map in Flink. So there is no need to add the following interface for now. We could introduce it when we support List in Flink.
Table hint(String name, String... parameters)


Description:

Describe the schema of a table or a view.

Example 1:

kafkaTable.hints("OPTIONS", ImmutableMap.of("scan.startup.mode", "earliest-offset"))
                .select($("id"), $("name"))

It’s equivalent to the following SQL:

select id, name from kafka_table /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

describe

Specification:

TableResult describe()

...

Sampling

sample

API Specification:

Table sample(double fraction)

Table sample(double fraction, long seed)


Description:

Take a sample of the table according to the given fraction([0.0, 1.0]). 

Example:

table.sample(0.1)

It’s equivalent to the following SQL:

SELECT
  a, b, c
FROM T
WHERE RAND() < 0.1

split

API Specification:

Table[] split(double[] weights)

Table[] split(double[] weights, long seed)

Description:
Splits the table into multiple sub-tables according to the given weights.

Example:

table.split(new double[] { 0.1, 0.2, 0.3 })

It’s logically equivalent to the following SQL:

CREATE VIEW TT AS
SELECT
  a, b, c, RAND(100) as d
FROM T

CREATE VIEW TT1 AS
SELECT
  a, b, c
FROM TT
WHERE d < 0.1/(0.1 + 0.2 + 0.3)

CREATE VIEW TT2 AS
SELECT
  a, b, c
FROM TT
WHERE d >= 0.1/(0.1 + 0.2 + 0.3) and d < 0.2/(0.1 + 0.2 + 0.3)

CREATE VIEW TT3 AS
SELECT
  a, b, c
FROM TT
WHERE d >= 0.2/(0.1 + 0.2 + 0.3)

NOTE: The seed for all the RAND should be the same to make sure that the random value is the same for one element. This is to make sure that one element belongs to only one sub-table(e.g. TT1, TT2, TT3).