Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Stored procedure provides a convenient way to encapsulate complex logic to perform data manipulation or administrative tasks in external storage systems. It's widely used in traditional databases and popular compute engines like Trino for it's convenience.
However, Flink currently does not support calling stored procedure directly, which limits its ability to intergrate with external systems and makes it harder for users to access/manage their data in external systems with Flink.
Therefore, we propose adding support for call stored procedure in Flink to enable better integration with external storage systems. This will allow users access/manage their data smoothly with just executing one call statement in Flink and bring users more efficient data processing workflows.
With this FLIP, Flink will allow connector developers to develop their own built-in stored procedures, and then enables users to call these predefiend stored procedures .
Note: In this FLIP, we don't intend to allow users to customize their own stored procedure for we don't want to expose too much to users too early.
Public Interfaces
Syntax
We propose the following synatax to call a stored procedure:
CALL [catalog_name.][database_name.]procedure_name ([ expression [, expression]* ] )
Note: The expression
can be a function call, but should be reduced to literal value. Otherwise, it'll throw an exception.
Public interfaces changes
First, we propose a new interface Procedure
for stored procedure. The stored procedure providers are expected to implement Procedure
to define their own procedure.
/** * Base interface representing a stored procedure that can be executed by Flink. An stored procedure * accepts zero, one, or multiple input parameters and then return the execution result of the * stored procedure. * * <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call * method must be declared publicly, not static, and named <code>call</code>. Call methods can also * be overloaded by implementing multiple methods named <code></code>. Currently, it doesn't allows * users to custom their own procedure, the customer {@link Procedure} can only be provided by * {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link * Catalog#getProcedure(ObjectPath)}. * * <p>When calling a stored procedure, Flink will always pass the <code> * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter of * the <code>call</code> method. So, the custom call method must accept the <code> * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter, and * the other parameters of the <code>call</code> method are the actual parameter of the stored * procedure. * * <p>Note: The stored procedure can only be aware of the objects of the catalog that it belongs * from the passed <code>StreamTableEnvironment</code>. * * <p>By default, input data types are automatically extracted using reflection. The input arguments * are derived from one or more {@code call()} methods. If the reflective information is not * sufficient, it can be supported and enriched with {@link DataTypeHint} and {@link ProcedureHint}. * It should only hint the input arguments that start from the second argument since the first * argument is always <code>StreamTableEnvironment</code> which doesn't need to be annotated. * * <p>The return type of the {@code call()} method should always be an {@link CloseableIterable} of * {@link Row}. And the result type must be hinted with {@link DataTypeHint} to hint the data type * for each field in the <code>Row</code> since it can't be derived directly. * * <p>The following examples show how to specify a stored procedure: * * <pre>{@code * // a stored procedure that accepts an arbitrary number of INT arguments and return an {@link CloseableIterable} of explicit ROW < INT > * class OutputArgumentProcedure implements Procedure { * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer... args) { * return CloseableIterator.adapterForIterator(Arrays.stream(args).map(Row::of).iterator()); * } * } * * // a stored procedure that accepts either INT or DOUBLE and return an {@link CloseableIterable} of explicit ROW < INT > * class RandomDataProcedure implements Procedure { * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) { * Random random = new Random(); * return environment.fromValues(random.ints(n).iterator()).execute().collect(); * } * * public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) { * Random random = new Random(); * return environment.fromValues(random.ints(Integer.parseInt(n))).execute().collect(); * } * } * * // a stored procedure that output a ROW < i INT, s STRING >, the procedure hint helps in * // declaring the row's fields * @ProcedureHint(output = @DataTypeHint("ROW< i INT, s STRING>")) * class DuplicatorProcedure implements Procedure { * public CloseableIterator<Row> call(Integer i, String s) { * return CloseableIterator.adapterForIterator(Arrays.asList(Row.of(i, s), Row.of(i, s)).iterator()); * } * } * }</pre> * * <p>In the API, a stored procedure can be used as follows: * * <pre>{@code * // for SQL users * TableEnvironment tEnv = ... * tEnv.executeSql("CALL randomDataProcedure(t)"); * }</pre> * * </pre> */ @PublicEvolving public interface Procedure {}
To make datatype hint available for Procedure
, we propose annotation ProcedureHint
just like FunctionHint.
@PublicEvolving @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface ProcedureHints { ProcedureHint[] value(); }
@PublicEvolving @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) @Repeatable(ProcedureHints.class) public @interface ProcedureHint { DataTypeHint[] input() default @DataTypeHint(); boolean isVarArgs() default false; String[] argumentNames() default {""}; DataTypeHint output() default @DataTypeHint(); }
Then, we propose to add an new interface to Catalog
to provide Procedure
according to name; The connector devopler can then return the actual procedure in this method.
@PublicEvolving public interface Catalog { /** * Get the procedure. procedure name should be handled in a case insensitive way. * * @param procedurePath path of the procedure * @return the requested function * @throws ProcedureNotExistException if the function does not exist in the catalog * @throws CatalogException in case of any runtime exception */ default Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { throw new UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass())); } }
/** Exception for trying to operate on a procedure that doesn't exist. */ @PublicEvolving public class ProcedureNotExistException extends Exception { private static final String MSG = "Procedure %s does not exist in Catalog %s."; public ProcedureNotExistException(String catalogName, ObjectPath functionPath) { this(catalogName, functionPath, null); } public ProcedureNotExistException( String catalogName, ObjectPath functionPath, Throwable cause) { super(String.format(MSG, functionPath.getFullName(), catalogName), cause); } }
Proposed Changes
1: In method FunctionCatalogOperatorTable#lookupOperatorOverloads,
if the passed SqlFunctionCategory
is USER_DEFINED_PROCEDURE
, get the procedure in Catalog
with the given SqlIdentifier,
and then wrap it to BridginSqlProcedure
which is intance of SqlFunction.
2: In BridgingSqlProcedure,
the type inference logic for procedure follows the style of function, which will try to extract the datatype of arguments and return value from the call
method in the procedure or DataTypeHint
if annotated. The logic for extract datatype for procedure will much reuse the code for function but with a little of adjustment for Procedure.
3: Add a class name SqlCallProcedureConverter
which is used to convert the SqlNode for call procedure to CallProcedureOperation
, in SqlCallProcedureConverter
, if the SqlNode is instance of SqlBasicCall
and the operator is instance of SqlProcedureCallOperator,
it should be call procedure statement, then convert it to CallProcedureOperation
contains the procedure and the expression for arguments.
4: In method TableEnvironmentImpl#executeInternal
, if the Operation
is intance of CallProcedureOperation
, construnct a StreamTableEnvironment
, and pass it with the arguments specified by user to call the corresponding method procedure#call(xxx).
5: Get the result of procedure#call(xxx)
, which should be CloseableIterator.
Build a TableResult
with the so that the SqlClient can show the result of the call procedure to users.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.