Status
Current state: Completed Discussion
Discussion thread:
Enhancing-the-functionality-and-productivity-of-Table-API
Table-API-Enhancement-Outline
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Table API is a declarative API to define queries on static and streaming tables. And made tremendous progress in the last two years in terms of both functionality and performance, which only makes our desire to cover even more scenarios stronger. This FLIP proposes to enhancing the functionality and productivity of Table API in a systematic manner. In particular, we seek to support:
TableAggregateFunction, i.e., table aggregates are computed for a group of elements, and the function outputs a group of elements.
Map Operator, which can apply a scalar function and output multi-column.
FlatMap Operator, which can apply a table function and output multi-row.
Aggregate Operator, which can apply a aggregate function and output multi-column.
FlatAggregate Operator, which can apply a table aggregate function and output multi-row.
Analysis
First, the user defined functions can be classified by the numbers of rows (logically) for their inputs and outputs. The table below is a quick summary.
UDF | Single Row Input | Multiple Row Input |
Single Row Output | ScalarFunction | AggregateFunction |
Multiple Row Output | TableFunction | TableAggregateFunction |
We introduced a new TableAggregateFunction to make this table complete here. This function takes multi-row input and produces multi-row output. In some sense, it behaves like a simple user defined operator.
Secondly,we can also classify the functions by looking at the number of columns for their inputs and outputs. We notice that both ScalarFunction and AggregateFunction can only output a single column, while TableFunction can output multiple columns. We would like to extend the ability of outputting multiple columns to ScalarFunction and AggregateFunction. For this, we follow what we did in TableFunction. If a ScalarFunction or AggregateFunction returns an object of type T (which is a Tuple or Pojo), we can expand T to multiple columns. But this could be problematic for Table.select because T could also be interpreted as the type of a single column. To make the semantics unambiguous, we need to introduce new methods for the new behavior (TableFunction did not suffer from this problem because we used lateral join instead of Table.select for that). The table below summarizes the new methods that we want to introduce for functions which have multi-column outputs.
Table Method | Single Column Output | Multiple Column Output |
ScalarFunction | Table.select | Table.map |
AggregateFunction | Table.select | GroupedTable.aggregate |
TableFunction | N/A | Table.flatMap |
TableAggregateFunction | N/A | GroupedTable.flatAggregate |
We have introduced four new operators in Table here: Table.map, GroupedTable.aggregate, Table.flatMap, and GroupedTable.flatAggregate. These new operators will expand a complex output T to multiple columns.
Public Interfaces and new operators
TableAggregateFunction
The behavior of table aggregates is most like GroupReduceFunction did, which computed for a group of elements, and output a group of elements. The TableAggregateFunction can be applied on GroupedTable.flatAggregate() which will revisit later. The interface of TableAggregateFunction as follows:
/** * Base class for User-Defined table Aggregates. * * <p>The behavior of an {@link TableAggregateFunction} can be defined by implementing * a series of custom methods. An {@link TableAggregateFunction} needs at least three methods: * - createAccumulator, * - accumulate, and * - emitValue or emitValueWithRetract * * <p>There are a few other methods that can be optional to have: * - retract, * - merge * * <p>All these methods muse be declared publicly, not static and named exactly as the names * mentioned above. The methods createAccumulator and emitValue are defined in the * {@link TableAggregateFunction} functions, while other methods are explained below. * * * {@code * Processes the input values and update the provided accumulator instance. The method * accumulate can be overloaded with different custom types and arguments. An AggregateFunction * requires at least one accumulate() method. * * param accumulator the accumulator which contains the current aggregated results * param [user defined inputs] the input value (usually obtained from a new arrived data). * * public void accumulate(ACC accumulator, [user defined inputs]) * } * * * {@code * Retracts the input values from the accumulator instance. The current design assumes the * inputs are the values that have been previously accumulated. The method retract can be * overloaded with different custom types and arguments. This function must be implemented for * datastream bounded over aggregate. * * param accumulator the accumulator which contains the current aggregated results * param [user defined inputs] the input value (usually obtained from a new arrived data).aol * * public void retract(ACC accumulator, [user defined inputs]) * } * * * {@code * Merges a group of accumulator instances into one accumulator instance. This function must be * implemented for datastream session window grouping aggregate and dataset grouping aggregate. * * param accumulator the accumulator which will keep the merged aggregate results. It should * be noted that the accumulator may contain the previous aggregated * results. Therefore user should not replace or clean this instance in the * custom merge method. * param its an {@link Iterable} pointed to a group of accumulators that will be * merged. * * public void merge(ACC accumulator, Iterable<ACC> its) * } * * * {@code * Called every time when an table-valued aggregation result should be materialized. * The returned value could be either an early and incomplete result * (periodically emitted as data arrive) or the final result of the table-valued aggregation. * * The implementation logic do not need deal with retract messages. * For Example, if we calculate top3(ASC order), the behavior as follows: * Assume there are 4 messages: {1, 2, 6, 4} * - Framework should generate retract message according to DAG pattern. * ----------------------------------------------------------------------- * | Input | OutPut | Framework behavior of when need retract message | * ------------------------------------------------------------------------ * | 1 | collector.collect(1)| | * ----------------------------------------------------------------------- * | 2 | collector.collect(1)| collector.retract(1) | * | | collector.collect(2)| | * ----------------------------------------------------------------------- * | 6 | collector.collect(1)| collector.retract(1) | * | | collector.collect(2)| collector.retract(2) | * | | collector.collect(6)| | * ----------------------------------------------------------------------- * | 4 | collector.collect(1) | collector.retract(1) | * | | collector.collect(2) | collector.retract(2) | * | | collector.collect(4) | collector.retract(6) | * ----------------------------------------------------------------------- * * public void emitValue(ACC accumulator, Collector<T> out) * } * * {@code * Called every time when an table-valued aggregation result should be materialized. * The returned value could be either an early and incomplete result * (periodically emitted as data arrive) or the final result of the table-valued aggregation. * * * The implementation logic should deal with retract message. * For Example, if we calculate top3(ASC order), the behavior as follows: * Assume there are 4 messages: {1, 2, 6, 4} * - Framework do not need generate the retract message. * ---------------------------------- * | Input | OutPut | * ---------------------------------- * | 1 | collector.collect(1) | * --------------------------------- * | 2 | collector.collect(2) | * --------------------------------- * | 6 | collector.collect(6) | * --------------------------------- * | 4 | collector.retract(6) | * | | collector.collect(4) | * --------------------------------- * * public void emitValueWithRetract(ACC accumulator, Collector<T> out) * } * * @param <T> the type of the table aggregation result * @param <ACC> the type of the table aggregation accumulator. The accumulator is used to keep the * table aggregated values which are needed to compute an table aggregation result. * TableAggregateFunction represents its state using accumulator, thereby the state of the * TableAggregateFunction must be put into the accumulator. */ public abstract class TableAggregateFunction<T, ACC> extends UserDefinedFunction { /** * Creates and init the Accumulator for this {@link TableAggregateFunction}. * * @return the accumulator with the initial value */ public abstract ACC createAccumulator(); /** * Returns the DataType of the TableAggregateFunction's result. * * @return The DataType of the TableAggregateFunction's result or null if the result type * should be automatically inferred. */ public DataType getResultType() { return null; } /** * Returns the DataType of the TableAggregateFunction's accumulator. * * @return The DataType of the TableAggregateFunction's accumulator or null if the * accumulator type should be automatically inferred. */ public DataType getAccumulatorType() { return null; } |
Collector Interface
public interface Collector<T> { /** * Emits a record. * * @param record The record to collect. */ void collect(T msg); /** * Emits a retract record. * * @param record The record to retract. */ void retract(T msg); } |
Table.Map
Map operator is a new operator of Table. It takes a ScalarFunction which returns a single row with multiple columns. The usage as follows:
val res = tab |
Table.FlatMap
FlatMap operator is a new operator of Table. It takes a TableFunction which returns multiple rows. The usage as follows:
val res = tab |
GroupedTable.aggregate
Agg operator is a new operator of Table/GroupedTable. It takes an AggregateFunction which returns a single row with multiple columns. The usage as follows:
val res = tab |
GroupedTable.flatAggregate
FlatAgg operator is a new operator of Table/GroupedTable. It takes a TableAggregateFunction which returns multiple rows. The usage as follows:
val res = tab |
Time attribute and group keys
The time attribute is lost after the map/flatMap operation. e.g.:
tab('name, 'age, 'address, 'rowtime) tab.map(udf('name)).as('col1, 'col2) // Does not include time attributes. |
For the aggregate/flatAggregate operation should force users to use select. E.g.:
val result = tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) .aggregate(aggFun('a)) // Generate AggregateTable that can only do select .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) // Cannot contain aggregate expression. |
Proposed Changes
See the previous section about public interfaces and table operators for the Table API. The interfaces and operators will be available in Scala and Java Table API. And we will add new function interface of TableAggregateFunction which can using in flatAggregate operator. In addition, we will add checks for all of the changes.
Compatibility, Deprecation, and Migration Plan
This FLIP proposes new functionality and operators for the Table API. The behavior of existing operators is not modified.
Test Plan
This FLIP proposes can check by both It test case and validate test case.
Rejected Alternatives
No rejected alternatives yet.
Implementation Plan
The implementation of this effort can be divided into several subtasks:
Implementation of TableAggregateFunction on static and streaming tables
Implementation of Map Operator on static and streaming tables
Implementation of FlatMap Operator on static and streaming tables
Implementation of Aggregate Operator on static and streaming tables
Implementation of FaltAggregate Operator on static and streaming tables
Future work
Discuss how to support column operations,such as: udf(*)/udf(_)/udf(*.reshape())/tab.drop(columns) etc.
Discuss the function of GroupedTable.select('*).
Discuss User-Defined window, User-Defined Jion and Iteration.
Introducing nested tables.