...
@udf(input=[DataTypes.BIGINT(), DataTypes.BIGINT()], output=DataTypes.BIGINT()) class Multiply(ScalarFunction): def eval(self, x, y): return x * y def is_deterministic(self): return False
class Split(TableFunction): def eval(self, x): for e in x.split(“#”): yield e class SumAcc(object): def __init__(self) self.sum = 0
output=DataTypes.BIGINT(), accumulator=DataTypes.ROW([DataTypes.FIELD(“sum”, DataTypes.BIGINT())])) class Sum(AggregateFunction): def accumulate(self, accumulator, input): accumulator.sum += 1 def reset(self, accumulator): accumulator.sum = 0 def create_accumulator(self): return SumAcc() def get_value(self, accumulator): return accumulator.sum |
...
@udf(input=[DataTypes.BIGINT(), DataTypes.BIGINT()], output=DataTypes.BIGINT(), deterministic=False) def multiply(x, y): return x * y @udtf(input=[DataTypes.STRING()], output=DataTypes.STRING()) def split(self, x): for e in x.split(“#”): yield e |
...
public interface FunctionDefinition { … default FunctionLanguage getLanguage() { return FunctionLanguage.JAVA; } } public enum FunctionLanguage { JVM, PYTHON } |
Examples
class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j t_env.register_function("add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())) t_env.register_function("subtract_one", udf(SubtractOne(), DataTypes.BIGINT(),DataTypes.BIGINT())) t_env.register_function("add", add) |
High-Level Execution Mode
...