THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
First, we propose a new interface Procedure
for stored procedure. The stored procedure providers are expected to implement Procedure
to define their own procedure.
Code Block | ||
---|---|---|
| ||
/**
* Base interface representing a stored procedure that can be executed by Flink. An stored procedure
* accepts zero, one, or multiple input parameters and then return the execution result of the
* stored procedure.
*
* <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call
* method must be declared publicly, not static, and named <code>call</code>. Call methods can also
* be overloaded by implementing multiple methods named <code></code>. Currently, it doesn't allows
* users to custom their own procedure, the customer {@link Procedure} can only be provided by
* {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
* Catalog#getProcedure(ObjectPath)}.
*
* <p>When calling a stored procedure, Flink will always pass the <code>
* org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter of
* the <code>call</code> method. So, the custom call method must accept the <code>
* org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter, and
* the other parameters of the <code>call</code> method are the actual parameter of the stored
* procedure.
*
* <p>Note: The stored procedure can only be aware of the objects of the catalog that it belongs
* from the passed <code>StreamTableEnvironment</code>.
*
* <p>By default, input data types are automatically extracted using reflection. The input arguments
* are derived from one or more {@code call()} methods. If the reflective information is not
* sufficient, it can be supported and enriched with {@link DataTypeHint} and {@link ProcedureHint}.
* It should only hint the input arguments that start from the second argument since the first
* argument is always <code>StreamTableEnvironment</code> which doesn't need to be annotated.
*
* <p>The return type of the {@code call()} method should always be an {@link CloseableIterable} of
* {@link Row}. And the result type must be hinted with {@link DataTypeHint} to hint the data type
* for each field in the <code>Row</code> since it can't be derived directly.
*
* <p>The following examples show how to specify a stored procedure:
*
* <pre>{@code
* // a stored procedure that accepts an arbitrary number of INT arguments and return an {@link CloseableIterable} of explicit ROW < INT >
* class OutputArgumentProcedure implements Procedure {
* public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer... args) {
* return CloseableIterator.adapterForIterator(Arrays.stream(args).map(Row::of).iterator());
* }
* }
*
* // a stored procedure that accepts either INT or DOUBLE and return an {@link CloseableIterable} of explicit ROW < INT >
* class RandomDataProcedure implements Procedure {
* public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) {
* Random random = new Random();
* return environment.fromValues(random.ints(n).iterator()).execute().collect();
* }
*
* public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) {
* Random random = new Random();
* return environment.fromValues(random.ints(Integer.parseInt(n))).execute().collect();
* }
* }
*
* // a stored procedure that output a ROW < i INT, s STRING >, the procedure hint helps in
* // declaring the row's fields
* @ProcedureHint(output = @DataTypeHint("ROW< i INT, s STRING>"))
* class DuplicatorProcedure implements Procedure {
* public CloseableIterator<Row> call(Integer i, String s) {
* return CloseableIterator.adapterForIterator(Arrays.asList(Row.of(i, s), Row.of(i, s)).iterator());
* }
* }
* }</pre>
*
* <p>In the API, a stored procedure can be used as follows:
*
* <pre>{@code
* // for SQL users
* TableEnvironment tEnv = ...
* tEnv.executeSql("CALL randomDataProcedure(t)");
* }</pre>
*
* </pre>
*/
@PublicEvolving
public interface Procedure {} |
...