...
Note: The expression
can be a function call.
Public interfaces changes
FirstTo make user can know the procedures provided , we propose a new interface name ProcedureContext
to provide a context for stored procedure. Currently, it provides StreamExecutionEnvironment
to enable stored procedure to run a Flink job.the follwing synatax to show the procedures:
Code Block |
---|
|
/**SHOW APROCEDURES context[ to( provideFROM necessary| contextIN used by stored procedure. */
@PublicEvolving
public interface ProcedureContext {
/** Return the StreamExecutionEnvironment where the procedure is called. */
StreamExecutionEnvironment getExecutionEnvironment();
}) [catalog_name.]database_name ] [ [NOT] (LIKE | ILIKE) <sql_like_pattern> ] |
Public interfaces changes
Firstthen, we propose a new interface Procedure
name ProcedureContext
to provide a context for stored procedure. The Currently, it provides StreamExecutionEnvironment
to enable stored procedure providers are expected to implement Procedure
to define their own procedureto run a Flink job.
Code Block |
---|
|
/**
*A Basecontext interfaceto representingprovide anecessary stored procedure that can be executed by Flink. An context used by stored procedure
. */
@PublicEvolving
public acceptsinterface ProcedureContext {
/** Return the StreamExecutionEnvironment where the procedure is called. */
StreamExecutionEnvironment getExecutionEnvironment();
} |
then, we propose a new interface Procedure
for stored procedure. The stored procedure providers are expected to implement Procedure
to define their own procedure.
Code Block |
---|
|
/**
* Base interface representing a stored procedure that can be executed by Flink. An stored procedure
* accepts zero, one, or multiple input parameters and then return the execution result of the
* stored procedure.
*
* <p>The behavior ofzero, one, or multiple input parameters and then return the execution result of the
* stored procedure.
*
* <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call
* method must be declared publicly, not static, and named <code>call</code>. Call methods can also
* be overloaded by implementing multiple methods named <code>call</code>. Currently, it doesn't
* allows users to custom their own procedure, the customer {@link Procedure} can only be provided
defined *by byimplements {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
* Catalog#getProcedure(ObjectPath)}.
*
* <p>When calling a stored procedure, Flink will always pass the <code>
* org.apache.flink.table.procedure.ProcedureContext</code> which provides
* StreamExecutionEnvironment currently as the first parameter of the <code>call</code> method. So,
* the custom call method must accept the <code>org.apache.flink.table.procedure.ProcedureContext
* </code> as the first parameter, and the other parameters of the <code>call</code> method are the
* actuala custom call method. An call
* method must be declared publicly, not static, and named <code>call</code>. Call methods can also
* be overloaded by implementing multiple methods named <code>call</code>. Currently, it doesn't
* allows users to custom their own procedure, the customer {@link Procedure} can only be provided
* by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
* Catalog#getProcedure(ObjectPath)}.
*
* <p>When calling a stored procedure, Flink will always pass the <code>
* org.apache.flink.table.procedure.ProcedureContext</code> which provides
* StreamExecutionEnvironment currently as the first parameter of the stored<code>call</code> proceduremethod.
*So,
* <p>Bythe default,custom inputcall andmethod outputmust dataaccept types are automatically extracted using reflection. Thethe <code>org.apache.flink.table.procedure.ProcedureContext
* input</code> argumentsas arethe derivedfirst fromparameter, oneand orthe moreother {@code call()} methods. If the reflective
* information is not sufficient, it can be supported and enriched with {@link DataTypeHint} and
* {@link ProcedureHint}. If it's used to hint input arguments, it should only hint the input
* arguments that start from the second argument since the first argument is always <code>
* ProcedureContext</code> which doesn't need to be annotated with data type hint.
*
* <p>Note: The return type of the {@code call()} method should always be T[] where T can be atomic
* type, Row, Pojo. Either an explicit composite type or an atomic type parameters of the <code>call</code> method are the
* actual parameter of the stored procedure.
*
* <p>By default, input and output data types are automatically extracted using reflection. The
* input arguments are derived from one or more {@code call()} methods. If the reflective
* information is not sufficient, it can be supported and enriched with {@link DataTypeHint} and
* {@link ProcedureHint}. If it's used to hint input arguments, it should only hint the input
* arguments that start from the second argument since the first argument is always <code>
* ProcedureContext</code> which doesn't need to be annotated with data type hint.
*
* <p>Note: The return type of the {@code call()} method should always be T[] where T can be atomic
* type, Row, Pojo. Either an explicit composite type or an atomic type that is implicitly wrapped
* into a row consisting of one field.
*
* <p>The following examples with pseudocode show how to write a stored procedure:
*
* <pre>{@code
* // a stored procedure that try to rewrite data files for iceberg, it accept STRING
* // and return an array of explicit ROW < STRING, STRING >.
* class IcebergRewriteDataFilesProcedure implements Procedure {
* public @DataTypeHint("ROW< rewritten_data_files_count STRING, added_data_files_count STRING >")
* Row[] call(ProcedureContext procedureContext, String tableName) {
* // planning for scanning the table to do rewriting
* Table table = loadTable(tableName);
* List<CombinedScanTask> combinedScanTasks = planScanTask(table);
*
* // now, rewrite the files according to the planning task
* StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
* DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
* RowDataRewriter rowDataRewriter =
* new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
* List<DataFile> addedDataFiles;
* try {
* addedDataFiles = rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
* } catch (Exception e) {
* throw new RuntimeException("Rewrite data file error.", e);
* }
*
* // replace the current files
* List<DataFile> currentDataFiles = combinedScanTasks.stream()
* .flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
* .collect(Collectors.toList());
* replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);
*
* // return the result for rewriting
* return new Row[] {Row.of(currentDataFiles.size(), addedDataFiles.size())};
* }
* }
*
* // a stored procedure that accepts either LONG or STRING and
* // return an array of STRING without without datatype hint.
* class RollbackToSnapShotProcedure implements Procedure {
* public String[] call(ProcedureContext procedureContext, String tableName, Long snapshot) {
* Table table = loadTable(tableName);
* Long previousSnapShotId = table.currentSnapshot();
* table.manageSnapshots().rollbackTo(snapshotId).commit();
* return new String[] {
* "previous_snapshot_id: " + previousSnapShotId,
* "current_snapshot_id " + snapshot
* };
* }
* }
* }</pre>
*
* <p>In the API, a stored procedure can be used as follows:
*
* <pre>{@code
* // for SQL users
* TableEnvironment tEnv = ...
* tEnv.executeSql("CALL rollback_to_snapshot('t', 1001)");
* }</pre>
*/
@PublicEvolving
public interface Procedure {} |
...
Finally, we propose to add an new interface to Catalog
to provide Procedure
according to name; The connector devopler can then return the actual procedure in this method.
Besides, we propose to add an new interface for catalog to list the procedures
Code Blockcode |
---|
|
@PublicEvolving
public interface Catalog {
/**
* Get the procedure. procedure name should be handled in a case insensitive way.
*public interface Catalog {
/**
* Get the procedure. procedure name should be handled in a case insensitive way.
*
* @param procedurePath path of the procedure
* @return the requested function
* @throws ProcedureNotExistException if the function does not exist in the catalog
* @throws CatalogException in case of any runtime exception
*/
default Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
throw new UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass()));
}
/**
* List the names of all procedures in the given database. An empty list is returned if no procedure.
*
* @param dbName name of the database.
* @param@return procedurePatha pathlist of the procedure
names of the procedures in * @return the requested functionthis database
* @throws ProcedureNotExistExceptionDatabaseNotExistException if the functiondatabase does not exist in the catalog
* @throws CatalogException in case of any runtime exception
*/
default ProcedureList<String> getProcedurelistFunctions(ObjectPathString procedurePathdbName)
throws ProcedureNotExistExceptionDatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException(String.format("getProcedurelistFunctions is not implemented for %s.", this.getClass()));
}
} |
Code Block |
---|
|
/** Exception for trying to operate on a procedure that doesn't exist. */
@PublicEvolving
public class ProcedureNotExistException extends Exception {
private static final String MSG = "Procedure %s does not exist in Catalog %s.";
public ProcedureNotExistException(String catalogName, ObjectPath functionPath) {
this(catalogName, functionPath, null);
}
public ProcedureNotExistException(
String catalogName, ObjectPath functionPath, Throwable cause) {
super(String.format(MSG, functionPath.getFullName(), catalogName), cause);
}
} |
...