...
/** * Base class for User-Defined table Aggregates. * * <p>The behavior of an {@link TableAggregateFunction} can be defined by implementing * a series of custom methods. An {@link TableAggregateFunction} needs at least three methods: * - createAccumulator, * - accumulate, and * - emitValue or emitValueWithRetract * * <p>There are a few other methods that can be optional to have: * - retract, * - merge * * <p>All these methods muse be declared publicly, not static and named exactly as the names * mentioned above. The methods createAccumulator and emitValue are defined in the * {@link TableAggregateFunction} functions, while other methods are explained below. * * * {@code * Processes the input values and update the provided accumulator instance. The method * accumulate can be overloaded with different custom types and arguments. An AggregateFunction * requires at least one accumulate() method. * * param accumulator the accumulator which contains the current aggregated results * param [user defined inputs] the input value (usually obtained from a new arrived data). * * public void accumulate(ACC accumulator, [user defined inputs]) * } * * * {@code * Retracts the input values from the accumulator instance. The current design assumes the * inputs are the values that have been previously accumulated. The method retract can be * overloaded with different custom types and arguments. This function must be implemented for * datastream bounded over aggregate. * * param accumulator the accumulator which contains the current aggregated results * param [user defined inputs] the input value (usually obtained from a new arrived data).aol * * public void retract(ACC accumulator, [user defined inputs]) * } * * * {@code * Merges a group of accumulator instances into one accumulator instance. This function must be * implemented for datastream session window grouping aggregate and dataset grouping aggregate. * * param accumulator the accumulator which will keep the merged aggregate results. It should * be noted that the accumulator may contain the previous aggregated * results. Therefore user should not replace or clean this instance in the * custom merge method. * param its an {@link Iterable} pointed to a group of accumulators that will be * merged. * * public void merge(ACC accumulator, Iterable<ACC> its) * } * * * {@code * Called every time when an table-valued aggregation result should be materialized. * The returned value could be either an early and incomplete result * (periodically emitted as data arrive) or the final result of the table-valued aggregation. * * The implementation logic do not need deal with retract messages. * For Example, if we calculate top3(ASC order), the behavior as follows: * Assume there are 4 messages: {1, 2, 6, 4} * - Framework should generate retract message according to DAG pattern. * ----------------------------------------------------------------------- * | Input | OutPut | Framework behavior of when need retract message | * ------------------------------------------------------------------------ * | 1 | collector.collect(1)| | * ----------------------------------------------------------------------- * | 2 | collector.collect(1)| collector.retract(1) | * | | collector.collect(2)| | * ----------------------------------------------------------------------- * | 6 | collector.collect(1)| collector.retract(1) | * | | collector.collect(2)| collector.retract(2) | * | | collector.collect(6)| | * ----------------------------------------------------------------------- * | 4 | collector.collect(1) | collector.retract(1) | * | | collector.collect(2) | collector.retract(2) | * | | collector.collect(4) | collector.retract(6) | * ----------------------------------------------------------------------- * * public void emitValue(ACC accumulator, Collector<T> out) * } * * {@code * Called every time when an table-valued aggregation result should be materialized. * The returned value could be either an early and incomplete result * (periodically emitted as data arrive) or the final result of the table-valued aggregation. * * * The implementation logic should deal with retract message. * For Example, if we calculate top3(ASC order), the behavior as follows: * Assume there are 4 messages: {1, 2, 6, 4} * - Framework do not need generate the retract message. * ---------------------------------- * | Input | OutPut | * ---------------------------------- * | 1 | collector.collect(1) | * --------------------------------- * | 2 | collector.collect(2) | * --------------------------------- * | 6 | collector.collect(6) | * --------------------------------- * | 4 | collector.retract(6) | * | | collector.collect(4) | * --------------------------------- * * public void emitValueWithRetractemitUpdateWithRetract(ACC accumulator, Collector<T> out) * } * * @param <T> the type of the table aggregation result * @param <ACC> the type of the table aggregation accumulator. The accumulator is used to keep the * table aggregated values which are needed to compute an table aggregation result. * TableAggregateFunction represents its state using accumulator, thereby the state of the * TableAggregateFunction must be put into the accumulator. */ public abstract class TableAggregateFunction<T, ACC> extends UserDefinedFunction { /** * Creates and init the Accumulator for this {@link TableAggregateFunction}. * * @return the accumulator with the initial value */ public abstract ACC createAccumulator(); /** * Returns the DataType of the TableAggregateFunction's result. * * @return The DataType of the TableAggregateFunction's result or null if the result type * should be automatically inferred. */ public DataType getResultType() { return null; } /** * Returns the DataType of the TableAggregateFunction's accumulator. * * @return The DataType of the TableAggregateFunction's accumulator or null if the * accumulator type should be automatically inferred. */ public DataType getAccumulatorType() { return null; } |
NOTE: Since the execution mode of the Stream operator has two modes, `ACC` and `ACCRetract`, users can achieve better performance by implementing special interfaces for streaming. The table below is a quick summary.
emitValue | emitUpdateWithRetract | emitUpdateWithoutRetract | |
ACC | Y | N | Y |
ACCRetract | Y | Y | N |
- emitValue - for batch and streaming.
- eimitUpdateWithRetract - only for streaming in ACC mode.
- emitUpdateWithoutRetract - only for streaming in ACCRetract mode(need key definition on TableAggregateFunction, under discussion).
Collector Interface
public interface Collector<T> { /** * Emits a record. * * @param record The record to collect. */ void collect(T msg); /** * Emits a retract record. * * @param record The record to retract. */ void retract(T msg); } |
...