THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
/** * Base interface representing a stored procedure that can be executed by Flink. An stored procedure * accepts zero, one, or multiple input parameters and then return the execution result of the * stored procedure. * * <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call * method must be declared publicly, not static, and named <code>call</code>. Call methods can also * be overloaded by implementing multiple methods named <code></code>. Currently, it doesn't allows * users to custom their own procedure, the customer {@link Procedure} can only be provided by * {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link * Catalog#getProcedure(ObjectPath)}. * * <p>When calling a stored procedure, Flink will always pass the <code> * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter of * the <code>call</code> method. So, the custom call method must accept the <code> * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter, and * the other parameters of the <code>call</code> method are the actual parameter of the stored * procedure. * * <p>Note: The stored procedure can only be aware of the objects of the catalog that it belongs * from the passed <code>StreamTableEnvironment</code>. * * <p>By default, input data types are automatically extracted using reflection. The input arguments * are derived from one or more {@code call()} methods. If the reflective information is not * sufficient, it can be supported and enriched with {@link DataTypeHint} and {@link ProcedureHint}. * It should only hint the input arguments that start from the second argument since the first * argument is always <code>StreamTableEnvironment</code> which doesn't need to be annotated. * * <p>The return type of the {@code call()} method should always be an {@link CloseableIterable} of * {@link Row}. And the result type must be hinted with {@link DataTypeHint} to hint the data type * for each field in the <code>Row</code> since it can't be derived directly. * * <p>The following examples show how to specify a stored procedure: * * <pre>{@code * // a stored procedure that accepts an arbitrary number of INT arguments and return an {@link CloseableIterable} of explicit ROW < INT > * class OutputArgumentProcedure implements Procedure { * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer... args) { * return CloseableIterator.adapterForIterator(Arrays.stream(args).map(Row::of).iterator()); * } * } * * // a stored procedure that accepts either INT or DOUBLE and return an {@link CloseableIterable} of explicit ROW < INT > * class RandomDataProcedure implements Procedure { * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) { * Random random = new Random(); * return environment.fromValues(random.ints(n).iterator()).execute().collect(); * } * * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) { * Random random = new Random(); * return environment.fromValues(random.ints(Integer.parseInt(n))).execute().collect(); * } * } * * // a stored procedure that output a ROW < i INT, s STRING >, the procedure hint helps in * // declaring the row's fields * @ProcedureHint(output = @DataTypeHint("ROW< i INT, s STRING>")) * class DuplicatorProcedure implements Procedure { * public CloseableIterator<Row> call(Integer i, String s) { * return CloseableIterator.adapterForIterator(Arrays.asList(Row.of(i, s), Row.of(i, s)).iterator()); * } * } * }</pre> * * <p>In the API, a stored procedure can be used as follows: * * <pre>{@code * // for SQL users * TableEnvironment tEnv = ... * tEnv.executeSql("CALL randomDataProcedure(t)"); * }</pre> * * </pre> */ @PublicEvolving public interface Procedure {} |
To make datatype hint available for Procedure
, we propose annotation ProcedureHint
just like FunctionHint.
...
Code Block | ||
---|---|---|
| ||
@PublicEvolving public interface Catalog { /** * Get the procedure. procedure name should be handled in a case insensitive way. * * @param procedurePath path of the procedure * @return the requested function * @throws ProcedureNotExistException if the function does not exist in the catalog * @throws CatalogException in case of any runtime exception */ default Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { throw new TableException("Can't get procedure.!");UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass())); } } |
Code Block | ||
---|---|---|
| ||
/** Exception for trying to operate on a procedure that doesn't exist. */ @PublicEvolving public class ProcedureNotExistException extends Exception { private static final String MSG = "Procedure %s does not exist in Catalog %s."; public ProcedureNotExistException(String catalogName, ObjectPath functionPath) { this(catalogName, functionPath, null); } public ProcedureNotExistException( String catalogName, ObjectPath functionPath, Throwable cause) { super(String.format(MSG, functionPath.getFullName(), catalogName), cause); } } |
...
5: Get the result of procedure#call(xxx)
, which should be CloseableIterator<Row>CloseableIterator.
Build a TableResult
with the so that the SqlClient can show the result of the call procedure to users.
...
Compatibility, Deprecation, and Migration Plan
...