THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
/** * Base interface representing a stored procedure that can be executed by Flink. An stored procedure * accepts zero, one, or multiple input parameters and then return the execution result of the * stored procedure. * * <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call * method must be declared publicly, not static, and named <code>call</code>. Call methods can also * be overloaded by implementing multiple methods named <code><<code>call</code>. Currently, it doesn't allows * allows users to custom their own procedure, the customer {@link Procedure} can only be provided by * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link * Catalog#getProcedure(ObjectPath)}. * * <p>When calling a stored procedure, Flink will always pass the <code> * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter of * the <code>call</code> method. So, the custom call method must accept the <code> * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter, and * the other parameters of the <code>call</code> method are the actual parameter of the stored * procedure. * * <p>Note: The stored procedure can only be aware of the objects of the catalog that it belongs * from the passed <code>StreamTableEnvironment</code>. * * <p>By default, input and output data types are automatically extracted using reflection. The * input arguments * are derived from one or more {@code call()} methods. If the reflective * information is not * sufficient, it can be supported and enriched with {@link DataTypeHint} and * {@link ProcedureHint}. * It If it's used to hint input arguments, it should only hint the input * arguments that start from the second argument since the first * argument is always <code> * <code>StreamTableEnvironment<StreamTableEnvironment</code> which doesn't need to be annotated with data type hint. * * <p>Note: <p>TheThe return type of the {@code call()} method should always be an {@link CloseableIterable} of * {@link Row}. And the result type must be hinted with {@link DataTypeHint} to hint the data type * for each field in the <code>Row</code> since it can't be derived directly. *CloseableIterable}. * * <p>The following examples show how to specify a stored procedure: * * <pre>{@code * // a stored procedure that accepts an arbitrary number of INT arguments and return an {@link CloseableIterable} of explicit ROW < INT >Integer * class OutputArgumentProcedure implements Procedure { * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> CloseableIterator<Integer> call(StreamTableEnvironment tEnv, Integer... args) { * return CloseableIterator.adapterForIterator(Arrays.stream(args).map(Row::of).iterator()); * } * } * * // a stored procedure that accepts either INT or DOUBLESTRING and return an {@link CloseableIterable} of explicit ROW < INT > * class RandomDataProcedure implements Procedure { * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) { * Random random = new Random(); * return environment.fromValues(random.ints(n).iterator()).execute().collect(); * } * * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, IntegerString n) { * Random random = new Random(); * return environment.fromValues(random.ints(Integer.parseInt(n))).execute().collect(); * } * } * * // a stored procedure that output a ROW < i INT, s STRING >, the procedure hint helps in * // declaring the row's fields * @ProcedureHint(output = @DataTypeHint("ROW< i INT, s STRING>")) * class DuplicatorProcedure implements Procedure { * public CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer i, String s) { * return CloseableIterator.adapterForIterator(Arrays.asList(Row.of(i, s), Row.of(i, s)).iterator()); * } * } * }</pre> * * <p>In the API, a stored procedure can be used as follows: * * <pre>{@code * // for SQL users * TableEnvironment tEnv = ... * tEnv.executeSql("CALL randomDataProcedure(t, 5)"); * }</pre> * * </pre> */ @PublicEvolving public interface Procedure {} |
...