Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to thread/k6s50gcgznon9v1oylyh396gb5kgrwmd
Vote thread: https://issueslists.apache.org/jira/browse/FLINK-XXXX)
thread/659wfgm94oq7484q2bjsqr02xv7r0r4y
JIRA:
Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-32350 |
---|
|
Released: 1.18.0Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Note: The expression
can be a function call.
Public interfaces changes
FirstTo make user can know the procedures provided , we propose a new interface name ProcedureContext
to provide a context for stored procedure. Currently, it provides StreamExecutionEnvironment
to enable stored procedure to run a Flink job.the follwing synatax to show the procedures:
Code Block |
---|
|
/**SHOW APROCEDURES context[ to( provideFROM necessary| contextIN ) [catalog_name.]database_name ] [ [NOT] (LIKE | ILIKE) <sql_like_pattern> ] |
Public interfaces changes
First, we propose a new interface name ProcedureContext
to provide a context for stored procedure. Currently, it provides StreamExecutionEnvironment
to enable stored procedure to run a Flink job.
Code Block |
---|
|
/** A context to provide necessary context used by stored procedure. */
@PublicEvolving
public interface ProcedureContext {
/** Return the StreamExecutionEnvironment where the procedure is called. */
StreamExecutionEnvironment used by stored procedure. */
@PublicEvolving
public interface ProcedureContext {
/** Return the StreamExecutionEnvironment where the procedure is called. */
StreamExecutionEnvironment getExecutionEnvironment();
} |
then, we propose a new interface Procedure
Note: To avoid exposing to much to early to devs, we only put StreamExecutionEnvironment to the ProcedureContext currently since it's the most needed for stored procedure. If we find we stored procedure will need other things, we put add it to the ProcedureContext in the future.
then, we propose a new interface Procedure
for stored procedure. The stored procedure providers The stored procedure providers are expected to implement Procedure
to define their own procedure.
...
Code Block |
---|
|
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ProcedureHints {
ProcedureHint[] value();
} |
Code Block |
---|
|
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Repeatable(ProcedureHints.class)
public @interface ProcedureHint {
DataTypeHint[] input() default @DataTypeHint();
boolean isVarArgs() default false;
String[] argumentNames() default {""};
DataTypeHint output() default @DataTypeHint();
} |
Finally, we propose to add an new interface to Catalog
to provide Procedure
according to name; The connector devopler can then return the actual procedure in this method.
public @interface ProcedureHints {
ProcedureHint[] value();
} |
Code Block |
---|
|
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Repeatable(ProcedureHints.class)
public @interface ProcedureHint {
DataTypeHint[] input() default @DataTypeHint();
boolean isVarArgs() default false;
String[] argumentNames() default {""};
DataTypeHint output() default @DataTypeHint();
} |
Finally, we propose to add an new interface to Catalog
to provide Procedure
according to name; The connector devopler can then return the actual procedure in this method.
Besides, we propose to add an new interface for catalog to list the procedures
Code Block |
---|
|
@PublicEvolving
public interface Catalog {
/**
* Get the procedure. procedure name should be handled in a case insensitive way.
*
* @param procedurePath path of the procedure
* @return the requested function
* @throws ProcedureNotExistException if the function does not exist in the catalog
* @throws CatalogException in case of any runtime exception
*/
default Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
throw new UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass()));
}
/**
* List the names of all procedures in the given database. An empty list is returned if no procedure.
*
* @param dbName name of the database. |
Code Block |
---|
|
@PublicEvolving
public interface Catalog {
/**
* Get the procedure. procedure name should be handled in a case insensitive way.
*
* @param@return procedurePatha pathlist of the procedure
names of the procedures in * @return the requested functionthis database
* @throws ProcedureNotExistExceptionDatabaseNotExistException if the functiondatabase does not exist in the catalog
* @throws CatalogException in case of any runtime exception
*/
default ProcedureList<String> getProcedurelistProcedures(ObjectPathString procedurePathdbName)
throws ProcedureNotExistExceptionDatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException(String.format("getProcedurelistProcedures is not implemented for %s.", this.getClass()));
}
} |
Code Block |
---|
|
/** Exception for trying to operate on a procedure that doesn't exist. */
@PublicEvolving
public class ProcedureNotExistException extends Exception {
private static final String MSG = "Procedure %s does not exist in Catalog %s.";
public ProcedureNotExistException(String catalogName, ObjectPath functionPath) {
this(catalogName, functionPath, null);
}
public ProcedureNotExistException(
String catalogName, ObjectPath functionPath, Throwable cause) {
super(String.format(MSG, functionPath.getFullName(), catalogName), cause);
}
} |
...