Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

First, we propose a new interface Procedure for stored procedure. The stored procedure providers are expected to implement Procedure  to define their own procedure.

Code Block
languagejava
/**
 * Base interface representing a stored procedure that can be executed by Flink. An stored procedure
 * accepts zero, one, or multiple input parameters and then return the execution result of the
 * stored procedure.
 *
 * <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call
 * method must be declared publicly, not static, and named <code>call</code>. Call methods can also
 * be overloaded by implementing multiple methods named <code></code>. Currently, it doesn't allows
 * users to custom their own procedure, the customer {@link Procedure} can only be provided by
 * {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
 * Catalog#getProcedure(ObjectPath)}.
 *
 * <p>When calling a stored procedure, Flink will always pass the <code>
 * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter of
 * the <code>call</code> method. So, the custom call method must accept the <code>
 * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter, and
 * the other parameters of the <code>call</code> method are the actual parameter of the stored
 * procedure.
 *
 * <p>Note: The stored procedure can only be aware of the objects of the catalog that it belongs
 * from the passed <code>StreamTableEnvironment</code>.
 *
 * <p>By default, input data types are automatically extracted using reflection. The input arguments
 * are derived from one or more {@code call()} methods. If the reflective information is not
 * sufficient, it can be supported and enriched with {@link DataTypeHint} and {@link ProcedureHint}.
 * It should only hint the input arguments that start from the second argument since the first
 * argument is always <code>StreamTableEnvironment</code> which doesn't need to be annotated.
 *
 * <p>The return type of the {@code call()} method should always be an {@link CloseableIterable} of
 * {@link Row}. And the result type must be hinted with {@link DataTypeHint} to hint the data type
 * for each field in the <code>Row</code> since it can't be derived directly.
 *
 * <p>The following examples show how to specify a stored procedure:
 *
 * <pre>{@code
 * // a stored procedure that accepts an arbitrary number of INT arguments and return an {@link CloseableIterable} of explicit ROW < INT >
 * class OutputArgumentProcedure implements Procedure {
 *   public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer... args) {
 *     return CloseableIterator.adapterForIterator(Arrays.stream(args).map(Row::of).iterator());
 *   }
 * }
 *
 * // a stored procedure that accepts either INT or DOUBLE and return an {@link  CloseableIterable} of explicit ROW < INT >
 * class RandomDataProcedure implements Procedure {
 *  public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) {
 *    Random random = new Random();
 *    return environment.fromValues(random.ints(n).iterator()).execute().collect();
 *  }
 *
 *  public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) {
 *    Random random = new Random();
 *    return environment.fromValues(random.ints(Integer.parseInt(n))).execute().collect();
 *  }
 * }
 *
 * // a stored procedure that output a ROW < i INT, s STRING >, the procedure hint helps in
 * // declaring the row's fields
 * @ProcedureHint(output = @DataTypeHint("ROW< i INT, s STRING>"))
 * class DuplicatorProcedure implements Procedure {
 *   public CloseableIterator<Row> call(Integer i, String s) {
 *     return CloseableIterator.adapterForIterator(Arrays.asList(Row.of(i, s), Row.of(i, s)).iterator());
 *   }
 * }
 * }</pre>
 *
 * <p>In the API, a stored procedure can be used as follows:
 *
 * <pre>{@code
 * // for SQL users
 * TableEnvironment tEnv = ...
 * tEnv.executeSql("CALL randomDataProcedure(t)");
 * }</pre>
 *
 * </pre>
 */
@PublicEvolving
public interface Procedure {}

...