Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: The expression can be a function call.

Public interfaces changes

FirstTo make user can know the procedures provided , we propose a new interface name ProcedureContext  to provide a context for stored procedure. Currently, it provides StreamExecutionEnvironment to enable stored procedure to run a Flink job.the follwing synatax to show the procedures:

Code Block
languagejava
/**SHOW APROCEDURES context[ to( provideFROM necessary| contextIN used by stored procedure. */
@PublicEvolving
public interface ProcedureContext {

    /** Return the StreamExecutionEnvironment where the procedure is called. */
    StreamExecutionEnvironment getExecutionEnvironment();
}) [catalog_name.]database_name ] [ [NOT] (LIKE | ILIKE) <sql_like_pattern> ]


Public interfaces changes

Firstthen, we propose a new interface Procedure name ProcedureContext  to provide a context for stored procedure. The Currently, it provides StreamExecutionEnvironment to enable stored procedure providers are expected to implement Procedure  to define their own procedureto run a Flink job.

Code Block
languagejava
/**
 *A Basecontext interfaceto representingprovide anecessary stored procedure that can be executed by Flink. An context used by stored procedure
. */
@PublicEvolving
public acceptsinterface ProcedureContext {

    /** Return the StreamExecutionEnvironment where the procedure is called. */
    StreamExecutionEnvironment getExecutionEnvironment();
}

then, we propose a new interface Procedure for stored procedure. The stored procedure providers are expected to implement Procedure  to define their own procedure.

Code Block
languagejava
/**
 * Base interface representing a stored procedure that can be executed by Flink. An stored procedure
 * accepts zero, one, or multiple input parameters and then return the execution result of the
 * stored procedure.
 *
 * <p>The behavior ofzero, one, or multiple input parameters and then return the execution result of the
 * stored procedure.
 *
 * <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call
 * method must be declared publicly, not static, and named <code>call</code>. Call methods can also
 * be overloaded by implementing multiple methods named <code>call</code>. Currently, it doesn't
 * allows users to custom their own procedure, the customer {@link Procedure} can only be provided
defined *by byimplements {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
 * Catalog#getProcedure(ObjectPath)}.
 *
 * <p>When calling a stored procedure, Flink will always pass the <code>
 * org.apache.flink.table.procedure.ProcedureContext</code> which provides
 * StreamExecutionEnvironment currently as the first parameter of the <code>call</code> method. So,
 * the custom call method must accept the <code>org.apache.flink.table.procedure.ProcedureContext
 * </code> as the first parameter, and the other parameters of the <code>call</code> method are the
 * actuala custom call method. An call
 * method must be declared publicly, not static, and named <code>call</code>. Call methods can also
 * be overloaded by implementing multiple methods named <code>call</code>. Currently, it doesn't
 * allows users to custom their own procedure, the customer {@link Procedure} can only be provided
 * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
 * Catalog#getProcedure(ObjectPath)}.
 *
 * <p>When calling a stored procedure, Flink will always pass the <code>
 * org.apache.flink.table.procedure.ProcedureContext</code> which provides
 * StreamExecutionEnvironment currently as the first parameter of the stored<code>call</code> proceduremethod.
 *So,
 * <p>Bythe default,custom inputcall andmethod outputmust dataaccept types are automatically extracted using reflection. Thethe <code>org.apache.flink.table.procedure.ProcedureContext
 * input</code> argumentsas arethe derivedfirst fromparameter, oneand orthe moreother {@code call()} methods. If the reflective
 * information is not sufficient, it can be supported and enriched with {@link DataTypeHint} and
 * {@link ProcedureHint}. If it's used to hint input arguments, it should only hint the input
 * arguments that start from the second argument since the first argument is always <code>
 * ProcedureContext</code> which doesn't need to be annotated with data type hint.
 *
 * <p>Note: The return type of the {@code call()} method should always be T[] where T can be atomic
 * type, Row, Pojo. Either an explicit composite type or an atomic type parameters of the <code>call</code> method are the
 * actual parameter of the stored procedure.
 *
 * <p>By default, input and output data types are automatically extracted using reflection. The
 * input arguments are derived from one or more {@code call()} methods. If the reflective
 * information is not sufficient, it can be supported and enriched with {@link DataTypeHint} and
 * {@link ProcedureHint}. If it's used to hint input arguments, it should only hint the input
 * arguments that start from the second argument since the first argument is always <code>
 * ProcedureContext</code> which doesn't need to be annotated with data type hint.
 *
 * <p>Note: The return type of the {@code call()} method should always be T[] where T can be atomic
 * type, Row, Pojo. Either an explicit composite type or an atomic type that is implicitly wrapped
 * into a row consisting of one field.
 *
 * <p>The following examples with pseudocode show how to write a stored procedure:
 *
 * <pre>{@code
 * // a stored procedure that try to rewrite data files for iceberg, it accept STRING
 * // and return an array of explicit ROW < STRING, STRING >.
 * class IcebergRewriteDataFilesProcedure implements Procedure {
 *   public @DataTypeHint("ROW< rewritten_data_files_count STRING, added_data_files_count STRING >")
 *          Row[] call(ProcedureContext procedureContext, String tableName) {
 *     // planning for scanning the table to do rewriting
 *     Table table = loadTable(tableName);
 *     List<CombinedScanTask> combinedScanTasks = planScanTask(table);
 *
 *     // now, rewrite the files according to the planning task
 *     StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
 *     DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
 *     RowDataRewriter rowDataRewriter =
 *         new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
 *     List<DataFile> addedDataFiles;
 *     try {
 *       addedDataFiles = rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
 *     } catch (Exception e) {
 *       throw new RuntimeException("Rewrite data file error.", e);
 *     }
 *
 *     // replace the current files
 *     List<DataFile> currentDataFiles = combinedScanTasks.stream()
 *             .flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
 *             .collect(Collectors.toList());
 *     replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);
 *
 *     // return the result for rewriting
 *     return new Row[] {Row.of(currentDataFiles.size(), addedDataFiles.size())};
 *   }
 * }
 *
 * // a stored procedure that accepts either LONG or STRING and
 * // return an array of STRING without without datatype hint.
 * class RollbackToSnapShotProcedure implements Procedure {
 *   public String[] call(ProcedureContext procedureContext, String tableName, Long snapshot) {
 *     Table table = loadTable(tableName);
 *     Long previousSnapShotId = table.currentSnapshot();
 *     table.manageSnapshots().rollbackTo(snapshotId).commit();
 *     return new String[] {
 *             "previous_snapshot_id: " + previousSnapShotId,
 *             "current_snapshot_id " + snapshot
 *     };
 *   }
 * }
 * }</pre>
 *
 * <p>In the API, a stored procedure can be used as follows:
 *
 * <pre>{@code
 * // for SQL users
 * TableEnvironment tEnv = ...
 * tEnv.executeSql("CALL rollback_to_snapshot('t', 1001)");
 * }</pre>
 */
@PublicEvolving
public interface Procedure {}

...

Finally, we propose to add an new interface to Catalog  to provide Procedure  according to name; The connector devopler can then return the actual procedure in this method.

Besides, we propose to add an new interface for catalog to list the procedures

Code Blockcode
languagejava
@PublicEvolving
public interface Catalog {
    /**
     * Get the procedure. procedure name should be handled in a case insensitive way.
     *public interface Catalog {
    /**
     * Get the procedure. procedure name should be handled in a case insensitive way.
     *
     * @param procedurePath path of the procedure
     * @return the requested function
     * @throws ProcedureNotExistException if the function does not exist in the catalog
     * @throws CatalogException in case of any runtime exception
     */
   default Procedure getProcedure(ObjectPath procedurePath)
            throws ProcedureNotExistException, CatalogException {
      throw new UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass()));     
   }

   /**
     * List the names of all procedures in the given database. An empty list is returned if no procedure. 
     *
     * @param dbName name of the database.
     * @param@return procedurePatha pathlist of the procedure
names of the procedures in * @return the requested functionthis database
     * @throws ProcedureNotExistExceptionDatabaseNotExistException if the functiondatabase does not exist in the catalog
     * @throws CatalogException in case of any runtime exception
     */
    default ProcedureList<String> getProcedurelistFunctions(ObjectPathString procedurePathdbName)
            throws ProcedureNotExistExceptionDatabaseNotExistException, CatalogException {
         throw new UnsupportedOperationException(String.format("getProcedurelistFunctions is not implemented for %s.", this.getClass()));     
    }
 }


Code Block
languagejava
/** Exception for trying to operate on a procedure that doesn't exist. */
@PublicEvolving
public class ProcedureNotExistException extends Exception {
    private static final String MSG = "Procedure %s does not exist in Catalog %s.";

    public ProcedureNotExistException(String catalogName, ObjectPath functionPath) {
        this(catalogName, functionPath, null);
    }

    public ProcedureNotExistException(
            String catalogName, ObjectPath functionPath, Throwable cause) {
        super(String.format(MSG, functionPath.getFullName(), catalogName), cause);
    }
}

...