...
Page properties |
---|
...
|
Discussion thread:
Enhancing-the-functionality-and-productivity-of-Table-API
Table-API-Enhancement-Outline
...
|
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
First, the user defined functions can be classified by the numbers of rows (logically) for their inputs and outputs. The table below is a quick summary.
UDF | Single Row Input | Multiple Row Input |
Single Row Output | ScalarFunction | AggregateFunction |
Multiple Row Output | TableFunction | TableAggregateFunction |
We introduced a new TableAggregateFunction to make this table complete here. This function takes multi-row input and produces multi-row output. In some sense, it behaves like a simple user defined operator.
...
Secondly,we can also classify the functions by looking at the number of columns for their inputs and outputs. We notice that both ScalarFunction and AggregateFunction can only output a single column, while TableFunction can output multiple columns. We would like to extend the ability of outputting multiple columns to ScalarFunction and AggregateFunction. For this, we follow what we did in TableFunction. If a ScalarFunction or AggregateFunction returns an object of type T (which is a Tuple or Pojo), we can expand T to multiple columns. But this could be problematic for Table.select because T could also be interpreted as the type of a single column. To make the semantics unambiguous, we need to introduce new methods for the new behavior (TableFunction did not suffer from this problem because we used lateral join instead of Table.select for that). The table below summarizes the new methods that we want to introduce for functions which have multi-column outputs.
Table Method | Single Column Output | Multiple Column Output |
ScalarFunction | Table.select | Table.map |
AggregateFunction | Table.select | GroupedTable.aggregate |
TableFunction | N/A | Table.flatMap |
TableAggregateFunction | N/A | GroupedTable.flatAggregate |
We have introduced four new operators in Table here: Table.map, GroupedTable.aggregate, Table.flatMap, and GroupedTable.flatAggregate. These new operators will expand a complex output T to multiple columns.
...
The behavior of table aggregates is most like GroupReduceFunction did, which computed for a group of elements, and output a group of elements. The TableAggregateFunction can be applied on GroupedTable.flatAggregate() which will revisit later. The interface of TableAggregateFunction as follows:
/** * Base class for User-Defined table Aggregates. * * <p>The behavior of an {@link TableAggregateFunction} can be defined by implementing * a series of custom methods. An {@link TableAggregateFunction} needs at least three methods: * - createAccumulator, * - accumulate, and * - emitValue or emitValueWithRetract * * <p>There are a few other methods that can be optional to have: * - retract, * - merge * * <p>All these methods muse be declared publicly, not static and named exactly as the names * mentioned above. The methods createAccumulator and emitValue are defined in the * {@link TableAggregateFunction} functions, while other methods are explained below. * * * {@code * Processes the input values and update the provided accumulator instance. The method * accumulate can be overloaded with different custom types and arguments. An AggregateFunction * requires at least one accumulate() method. * * param accumulator the accumulator which contains the current aggregated results * param [user defined inputs] the input value (usually obtained from a new arrived data). * * public void accumulate(ACC accumulator, [user defined inputs]) * } * * * {@code * Retracts the input values from the accumulator instance. The current design assumes the * inputs are the values that have been previously accumulated. The method retract can be * overloaded with different custom types and arguments. This function must be implemented for * datastream bounded over aggregate. * * param accumulator the accumulator which contains the current aggregated results * param [user defined inputs] the input value (usually obtained from a new arrived data).aol * * public void retract(ACC accumulator, [user defined inputs]) * } * * * {@code * Merges a group of accumulator instances into one accumulator instance. This function must be * implemented for datastream session window grouping aggregate and dataset grouping aggregate. * * param accumulator the accumulator which will keep the merged aggregate results. It should * be noted that the accumulator may contain the previous aggregated * results. Therefore user should not replace or clean this instance in the * custom merge method. * param its an {@link Iterable} pointed to a group of accumulators that will be * merged. * * public void merge(ACC accumulator, Iterable<ACC> its) * } * * * {@code * Called every time when an table-valued aggregation result should be materialized. * The returned value could be either an early and incomplete result * (periodically emitted as data arrive) or the final result of the table-valued aggregation. * * The implementation logic do not need deal with retract messages. * For Example, if we calculate top3(ASC order), the behavior as follows: * Assume there are 4 messages: {1, 2, 6, 4} * - Framework should generate retract message according to DAG pattern. * ----------------------------------------------------------------------- * | Input | OutPut | Framework behavior of when need retract message | * ------------------------------------------------------------------------ * | 1 | collector.collect(1)| | * ----------------------------------------------------------------------- * | 2 | collector.collect(1)| collector.retract(1) | * | | collector.collect(2)| | * ----------------------------------------------------------------------- * | 6 | collector.collect(1)| collector.retract(1) | * | | collector.collect(2)| collector.retract(2) | * | | collector.collect(6)| | * ----------------------------------------------------------------------- * | 4 | collector.collect(1) | collector.retract(1) | * | | collector.collect(2) | collector.retract(2) | * | | collector.collect(4) | collector.retract(6) | * ----------------------------------------------------------------------- * * public void emitValue(ACC accumulator, Collector<T> out) * } * * {@code * Called every time when an table-valued aggregation result should be materialized. * The returned value could be either an early and incomplete result * (periodically emitted as data arrive) or the final result of the table-valued aggregation. * * * The implementation logic should deal with retract message. * For Example, if we calculate top3(ASC order), the behavior as follows: * Assume there are 4 messages: {1, 2, 6, 4} * - Framework do not need generate the retract message. * ---------------------------------- * | Input | OutPut | * ---------------------------------- * | 1 | collector.collect(1) | * --------------------------------- * | 2 | collector.collect(2) | * --------------------------------- * | 6 | collector.collect(6) | * --------------------------------- * | 4 | collector.retract(6) | * | | collector.collect(4) | * --------------------------------- * * public void emitUpdateWithRetract(ACC accumulator, Collector<T> out) * } * * @param <T> the type of the table aggregation result * @param <ACC> the type of the table aggregation accumulator. The accumulator is used to keep the * table aggregated values which are needed to compute an table aggregation result. * TableAggregateFunction represents its state using accumulator, thereby the state of the * TableAggregateFunction must be put into the accumulator. */ public abstract class TableAggregateFunction<T, ACC> extends UserDefinedFunction { /** * Creates and init the Accumulator for this {@link TableAggregateFunction}. * * @return the accumulator with the initial value */ public abstract ACC createAccumulator(); /** * Returns the DataType of the TableAggregateFunction's result. * * @return The DataType of the TableAggregateFunction's result or null if the result type * should be automatically inferred. */ public DataType getResultType() { return null; } /** * Returns the DataType of the TableAggregateFunction's accumulator. * * @return The DataType of the TableAggregateFunction's accumulator or null if the * accumulator type should be automatically inferred. */ public DataType getAccumulatorType() { return null; } |
NOTE: Since the execution mode of the Stream operator has two modes, `ACC` and `ACCRetract`, users can achieve better performance by implementing special interfaces for streaming. The table below is a quick summary.
emitValue | emitUpdateWithRetract | emitUpdateWithoutRetract | |
ACC | Y | N | Y |
ACCRetract | Y | Y | N |
- emitValue - for batch and streaming.
- eimitUpdateWithRetract - only for streaming in ACC mode(need key definition on TableAggregateFunction, under discussion).
- emitUpdateWithoutRetract - only for streaming in ACCRetract mode(need key definition on TableAggregateFunction, under discussion).
Collector Interface
public interface Collector<T> { /** * Emits a record. * * @param record The record to collect. */ void collect(T msg); /** * Emits a retract record. * * @param record The record to retract. */ void retract(T msg); } |
Table.Map
Map operator is a new operator of Table. It takes a ScalarFunction which returns a single row with multiple columns. The usage as follows:
val res = tab |
Table.FlatMap
FlatMap operator is a new operator of Table. It takes a TableFunction which returns multiple rows. The usage as follows:
val res = tab |
GroupedTable.aggregate
Agg operator is a new operator of Table/GroupedTable. It takes an AggregateFunction which returns a single row with multiple columns. The usage as follows:
val res = tab |
GroupedTable.flatAggregate
FlatAgg operator is a new operator of Table/GroupedTable. It takes a TableAggregateFunction which returns multiple rows. The usage as follows:
val res = tab |
Time attribute and group keys
The time attribute is lost after the map/flatMap operation. e.g.:
tab('name, 'age, 'address, 'rowtime) tab.map(udf('name)).as('col1, 'col2) // Does not include time attributes. |
For the aggregate/flatAggregate operation should force users to use select. E.g.:
val result = tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) .aggregate(aggFun('a)) // Generate AggregateTable that can only do select .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) // Cannot contain aggregate expression. |
Proposed Changes
See the previous section about public interfaces and table operators for the Table API. The interfaces and operators will be available in Scala and Java Table API. And we will add new function interface of TableAggregateFunction which can using in flatAggregate operator. In addition, we will add checks for all of the changes.
...