Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Stored procedure provides a convenient way to encapsulate complex logic to perform data manipulation or administrative tasks in external storage systems. It's widely used in traditional databases and popular compute engines like Trino for it's convenience.
However, Flink currently does not support calling stored procedure directly, which limits its ability to intergrate with external systems and makes it harder for users to access/manage their data in external systems with Flink.
Therefore, we propose adding support for call stored procedure in Flink to enable better integration with external storage systems. This will allow users access/manage their data smoothly with just executing one call statement in Flink and bring users more efficient data processing workflows.
With this FLIP, Flink will allow connector developers to develop their own built-in stored procedures, and then enables users to call these predefiend stored procedures .
Note: In this FLIP, we don't intend to allow users to customize their own stored procedure for we don't want to expose too much to users too early.
Public Interfaces
Syntax
We propose the following synatax to call a stored procedure:
CALL [catalog_name.][database_name.]procedure_name ([ expression [, expression]* ] )
Note: The expression
can be a function call, but should be reduced to literal value. Otherwise, it'll throw an exception.
Public interfaces changes
First, we propose a new interface Procedure
for stored procedure. The stored procedure providers are expected to implement Procedure
to define their own procedure.
@PublicEvolving public interface Procedure {}
To make , we propose annotation ProcedureHint
just like FunctionHint.
@PublicEvolving @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface ProcedureHints { ProcedureHint[] value(); }
@PublicEvolving @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) @Repeatable(ProcedureHints.class) public @interface ProcedureHint { DataTypeHint[] input() default @DataTypeHint(); boolean isVarArgs() default false; String[] argumentNames() default {""}; DataTypeHint output() default @DataTypeHint(); }
Then, we propose to add an new interface to Catalog
to provide Procedure
according to name; The connector devopler can then return the actual procedure in this method.
default Procedure getProcedure(ObjectPath tablePath) throws ProcedureNotExistException, CatalogException { throw new UnsupportedOperationException(this.getClass().getName() + "doesn't implement method getProcedure."); }
/** Exception for trying to operate on a procedure that doesn't exist. */ @PublicEvolving public class ProcedureNotExistException extends Exception { private static final String MSG = "Procedure %s does not exist in Catalog %s."; public ProcedureNotExistException(String catalogName, ObjectPath functionPath) { this(catalogName, functionPath, null); } public ProcedureNotExistException( String catalogName, ObjectPath functionPath, Throwable cause) { super(String.format(MSG, functionPath.getFullName(), catalogName), cause); } }
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.