Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Note: The expression can be a function call.

Public interfaces changes

FirstTo make user can know the procedures provided , we propose a new interface name ProcedureContext  to provide a context for stored procedure. Currently, it provides StreamExecutionEnvironment to enable stored procedure to run a Flink job.the follwing synatax to show the procedures:

Code Block
/**SHOW APROCEDURES context[ to( provideFROM necessary| contextIN used by stored procedure. */
public interface ProcedureContext {

    /** Return the StreamExecutionEnvironment where the procedure is called. */
    StreamExecutionEnvironment getExecutionEnvironment();
}) [catalog_name.]database_name ] [ [NOT] (LIKE | ILIKE) <sql_like_pattern> ]

Public interfaces changes

Firstthen, we propose a new interface Procedure name ProcedureContext  to provide a context for stored procedure. The Currently, it provides StreamExecutionEnvironment to enable stored procedure providers are expected to implement Procedure  to define their own procedureto run a Flink job.

Code Block
 *A Basecontext interfaceto representingprovide anecessary stored procedure that can be executed by Flink. An context used by stored procedure
. */
public acceptsinterface ProcedureContext {

    /** Return the StreamExecutionEnvironment where the procedure is called. */
    StreamExecutionEnvironment getExecutionEnvironment();

then, we propose a new interface Procedure for stored procedure. The stored procedure providers are expected to implement Procedure  to define their own procedure.

Code Block
 * Base interface representing a stored procedure that can be executed by Flink. An stored procedure
 * accepts zero, one, or multiple input parameters and then return the execution result of the
 * stored procedure.
 * <p>The behavior ofzero, one, or multiple input parameters and then return the execution result of the
 * stored procedure.
 * <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call
 * method must be declared publicly, not static, and named <code>call</code>. Call methods can also
 * be overloaded by implementing multiple methods named <code>call</code>. Currently, it doesn't
 * allows users to custom their own procedure, the customer {@link Procedure} can only be provided
defined *by byimplements {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
 * Catalog#getProcedure(ObjectPath)}.
 * <p>When calling a stored procedure, Flink will always pass the <code>
 * org.apache.flink.table.procedure.ProcedureContext</code> which provides
 * StreamExecutionEnvironment currently as the first parameter of the <code>call</code> method. So,
 * the custom call method must accept the <code>org.apache.flink.table.procedure.ProcedureContext
 * </code> as the first parameter, and the other parameters of the <code>call</code> method are the
 * actuala custom call method. An call
 * method must be declared publicly, not static, and named <code>call</code>. Call methods can also
 * be overloaded by implementing multiple methods named <code>call</code>. Currently, it doesn't
 * allows users to custom their own procedure, the customer {@link Procedure} can only be provided
 * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
 * Catalog#getProcedure(ObjectPath)}.
 * <p>When calling a stored procedure, Flink will always pass the <code>
 * org.apache.flink.table.procedure.ProcedureContext</code> which provides
 * StreamExecutionEnvironment currently as the first parameter of the stored<code>call</code> proceduremethod.
 * <p>Bythe default,custom inputcall andmethod outputmust dataaccept types are automatically extracted using reflection. Thethe <code>org.apache.flink.table.procedure.ProcedureContext
 * input</code> argumentsas arethe derivedfirst fromparameter, oneand orthe moreother {@code call()} methods. If the reflective
 * information is not sufficient, it can be supported and enriched with {@link DataTypeHint} and
 * {@link ProcedureHint}. If it's used to hint input arguments, it should only hint the input
 * arguments that start from the second argument since the first argument is always <code>
 * ProcedureContext</code> which doesn't need to be annotated with data type hint.
 * <p>Note: The return type of the {@code call()} method should always be T[] where T can be atomic
 * type, Row, Pojo. Either an explicit composite type or an atomic type parameters of the <code>call</code> method are the
 * actual parameter of the stored procedure.
 * <p>By default, input and output data types are automatically extracted using reflection. The
 * input arguments are derived from one or more {@code call()} methods. If the reflective
 * information is not sufficient, it can be supported and enriched with {@link DataTypeHint} and
 * {@link ProcedureHint}. If it's used to hint input arguments, it should only hint the input
 * arguments that start from the second argument since the first argument is always <code>
 * ProcedureContext</code> which doesn't need to be annotated with data type hint.
 * <p>Note: The return type of the {@code call()} method should always be T[] where T can be atomic
 * type, Row, Pojo. Either an explicit composite type or an atomic type that is implicitly wrapped
 * into a row consisting of one field.
 * <p>The following examples with pseudocode show how to write a stored procedure:
 * <pre>{@code
 * // a stored procedure that try to rewrite data files for iceberg, it accept STRING
 * // and return an array of explicit ROW < STRING, STRING >.
 * class IcebergRewriteDataFilesProcedure implements Procedure {
 *   public @DataTypeHint("ROW< rewritten_data_files_count STRING, added_data_files_count STRING >")
 *          Row[] call(ProcedureContext procedureContext, String tableName) {
 *     // planning for scanning the table to do rewriting
 *     Table table = loadTable(tableName);
 *     List<CombinedScanTask> combinedScanTasks = planScanTask(table);
 *     // now, rewrite the files according to the planning task
 *     StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
 *     DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
 *     RowDataRewriter rowDataRewriter =
 *         new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
 *     List<DataFile> addedDataFiles;
 *     try {
 *       addedDataFiles = rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
 *     } catch (Exception e) {
 *       throw new RuntimeException("Rewrite data file error.", e);
 *     }
 *     // replace the current files
 *     List<DataFile> currentDataFiles =
 *             .flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
 *             .collect(Collectors.toList());
 *     replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);
 *     // return the result for rewriting
 *     return new Row[] {Row.of(currentDataFiles.size(), addedDataFiles.size())};
 *   }
 * }
 * // a stored procedure that accepts either LONG or STRING and
 * // return an array of STRING without without datatype hint.
 * class RollbackToSnapShotProcedure implements Procedure {
 *   public String[] call(ProcedureContext procedureContext, String tableName, Long snapshot) {
 *     Table table = loadTable(tableName);
 *     Long previousSnapShotId = table.currentSnapshot();
 *     table.manageSnapshots().rollbackTo(snapshotId).commit();
 *     return new String[] {
 *             "previous_snapshot_id: " + previousSnapShotId,
 *             "current_snapshot_id " + snapshot
 *     };
 *   }
 * }
 * }</pre>
 * <p>In the API, a stored procedure can be used as follows:
 * <pre>{@code
 * // for SQL users
 * TableEnvironment tEnv = ...
 * tEnv.executeSql("CALL rollback_to_snapshot('t', 1001)");
 * }</pre>
public interface Procedure {}


Finally, we propose to add an new interface to Catalog  to provide Procedure  according to name; The connector devopler can then return the actual procedure in this method.

Besides, we propose to add an new interface for catalog to list the procedures

Code Blockcode
public interface Catalog {
     * Get the procedure. procedure name should be handled in a case insensitive way.
     *public interface Catalog {
     * Get the procedure. procedure name should be handled in a case insensitive way.
     * @param procedurePath path of the procedure
     * @return the requested function
     * @throws ProcedureNotExistException if the function does not exist in the catalog
     * @throws CatalogException in case of any runtime exception
   default Procedure getProcedure(ObjectPath procedurePath)
            throws ProcedureNotExistException, CatalogException {
      throw new UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass()));     

     * List the names of all procedures in the given database. An empty list is returned if no procedure. 
     * @param dbName name of the database.
     * @param@return procedurePatha pathlist of the procedure
names of the procedures in * @return the requested functionthis database
     * @throws ProcedureNotExistExceptionDatabaseNotExistException if the functiondatabase does not exist in the catalog
     * @throws CatalogException in case of any runtime exception
    default ProcedureList<String> getProcedurelistFunctions(ObjectPathString procedurePathdbName)
            throws ProcedureNotExistExceptionDatabaseNotExistException, CatalogException {
         throw new UnsupportedOperationException(String.format("getProcedurelistFunctions is not implemented for %s.", this.getClass()));     

Code Block
/** Exception for trying to operate on a procedure that doesn't exist. */
public class ProcedureNotExistException extends Exception {
    private static final String MSG = "Procedure %s does not exist in Catalog %s.";

    public ProcedureNotExistException(String catalogName, ObjectPath functionPath) {
        this(catalogName, functionPath, null);

    public ProcedureNotExistException(
            String catalogName, ObjectPath functionPath, Throwable cause) {
        super(String.format(MSG, functionPath.getFullName(), catalogName), cause);
