Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state:   [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread:  here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to thread/k6s50gcgznon9v1oylyh396gb5kgrwmd

Vote thread: https://issueslists.apache.org/jira/browse/FLINK-XXXX)thread/659wfgm94oq7484q2bjsqr02xv7r0r4y

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32350

Released: <Flink Version>1.18.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagesql
CALL [catalog_name.][database_name.]procedure_name ([ expression [, expression]* ] )

Note: The expression  can be a function call, but should be reduced  to literal value. Otherwise, it'll throw an exception.

Public interfaces changes

.

To make user can know the procedures provided , we propose the follwing synatax to show the procedures:First, we propose a new interface Procedure for stored procedure. The stored procedure providers are expected to implement Procedure  to define their own procedure.

Code Block
languagejava
/**
 * Base interface representing a stored procedure that can be executed by Flink. An stored procedure
 * accepts zero, one, or multiple input parameters and then return the execution result of the
 * stored procedure.
 *
 * <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call
 * method must be declared publicly, not static, and named <code>call</code>. Call methods can also
 * be overloaded by implementing multiple methods named <code>call</code>. Currently, it doesn't
 * allows users to custom their own procedure, the customer {@link Procedure} can only be provided
 * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
 * Catalog#getProcedure(ObjectPath)}.
 *
 * <p>When calling a stored procedure, Flink will always pass the <code>
 * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter of
 * the <code>call</code> method. So, the custom call method must accept the <code>
 * org.apache.flink.table.api.bridge.java.StreamTableEnvironment</code> as the first parameter, and
 * the other parameters of the <code>call</code> method are the actual parameter of the stored
 * procedure.
 *
 * <p>Note: The stored procedure can only be aware of the objects of the catalog that it belongs
 * from the passed <code>StreamTableEnvironment</code>.
 *
 * <p>By default, input and output data types are automatically extracted using reflection. The
 * input arguments are derived from one or more {@code call()} methods. If the reflective
 * information is not sufficient, it can be supported and enriched with {@link DataTypeHint} and
 * {@link ProcedureHint}. If it's used to hint input arguments, it should only hint the input
 * arguments that start from the second argument since the first argument is always <code>
 * StreamTableEnvironment</code> which doesn't need to be annotated with data type hint.
 *
 * <p>Note: The return type of the {@code call()} method should always be an {@link
 * CloseableIterable}.
 *
 * <p>The following examples show how to specify a stored procedure:
 *
 * <pre>{@code
 * // a stored procedure that accepts an arbitrary number of INT arguments and return an {@link CloseableIterable} of Integer
 * class OutputArgumentProcedure implements Procedure {
 *   public CloseableIterator<Integer> call(StreamTableEnvironment tEnv, Integer... args) {
 *     return CloseableIterator.adapterForIterator(Arrays.stream(args).iterator());
 *   }
 * }
 *
 * // a stored procedure that accepts either INT or STRING and return an {@link CloseableIterable} of explicit ROW < INT >
 * class RandomDataProcedure implements Procedure {
 *  public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer n) {
 *    Random random = new Random();
 *    return environment.fromValues(random.ints(n).iterator()).execute().collect();
 *  }
 *
 *  public @DataTypeHint("ROW<i INT>") CloseableIterator<Row> call(StreamTableEnvironment tEnv, String n) {
 *    Random random = new Random();
 *    return environment.fromValues(random.ints(Integer.parseInt(n))).execute().collect();
 *  }
 * }
 *
 * // a stored procedure that output a ROW < i INT, s STRING >, the procedure hint helps in
 * // declaring the row's fields
 * @ProcedureHint(output = @DataTypeHint("ROW< i INT, s STRING>"))
 * class DuplicatorProcedure implements Procedure {
 *   public CloseableIterator<Row> call(StreamTableEnvironment tEnv, Integer i, String s) {
 *     return CloseableIterator.adapterForIterator(Arrays.asList(Row.of(i, s), Row.of(i, s)).iterator());
 *   }
 * }
 * }</pre>
 *
 * <p>In the API, a stored procedure can be used as follows:
 *
 * <pre>{@code
 * // for SQL users
 * TableEnvironment tEnv = ...
 * tEnv.executeSql("CALL randomDataProcedure(t, 5)");
 * }</pre>
 *
 * </pre>
 */
@PublicEvolving
public interface Procedure {}

To make datatype hint available for Procedure , we propose annotation ProcedureHint  just like FunctionHint.

Code Block
languagejava
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ProcedureHints {
    ProcedureHint[] value();
}
Code Block
languagejava
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Repeatable(ProcedureHints.class)
public @interface ProcedureHint {

    DataTypeHint[] input() default @DataTypeHint();

    boolean isVarArgs() default false;

    String[] argumentNames() default {""};

    DataTypeHint output() default @DataTypeHint();
}

Then, we propose to add an new interface to Catalog  to provide Procedure  according to name; The connector devopler can then return the actual procedure in this method.

SHOW PROCEDURES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] (LIKE | ILIKE) <sql_like_pattern> ]


Public interfaces changes

First, we propose a new interface name ProcedureContext  to provide a context for stored procedure. Currently, it provides StreamExecutionEnvironment to enable stored procedure to run a Flink job.

Code Block
languagejava
/** A context to provide necessary context used by stored procedure. */
@PublicEvolving
public interface ProcedureContext {

    /** Return the StreamExecutionEnvironment where the procedure is called. */
    StreamExecutionEnvironment getExecutionEnvironment();
}

Note: To avoid exposing to much to early to devs, we only put  StreamExecutionEnvironment to the ProcedureContext currently since it's the most needed for stored procedure. If we find we stored procedure will need other things, we put add it to the ProcedureContext in the future.


then, we propose a new interface Procedure for stored procedure. The stored procedure providers are expected to implement Procedure  to define their own procedure.

Code Block
languagejava
/**
 * Base interface representing a stored procedure that can be executed by Flink. An stored procedure
 * accepts zero, one, or multiple input parameters and then return the execution result of the
 * stored procedure.
 *
 * <p>The behavior of {@link Procedure} can be defined by implements a custom call method. An call
 * method must be declared publicly, not static, and named <code>call</code>. Call methods can also
 * be overloaded by implementing multiple methods named <code>call</code>. Currently, it doesn't
 * allows users to custom their own procedure, the customer {@link Procedure} can only be provided
 * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must implement {@link
 * Catalog#getProcedure(ObjectPath)}.
 *
 * <p>When calling a stored procedure, Flink will always pass the <code>
 * org.apache.flink.table.procedure.ProcedureContext</code> which provides
 * StreamExecutionEnvironment currently as the first parameter of the <code>call</code> method. So,
 * the custom call method must accept the <code>org.apache.flink.table.procedure.ProcedureContext
 * </code> as the first parameter, and the other parameters of the <code>call</code> method are the
 * actual parameter of the stored procedure.
 *
 * <p>By default, input and output data types are automatically extracted using reflection. The
 * input arguments are derived from one or more {@code call()} methods. If the reflective
 * information is not sufficient, it can be supported and enriched with {@link DataTypeHint} and
 * {@link ProcedureHint}. If it's used to hint input arguments, it should only hint the input
 * arguments that start from the second argument since the first argument is always <code>
 * ProcedureContext</code> which doesn't need to be annotated with data type hint.
 *
 * <p>Note: The return type of the {@code call()} method should always be T[] where T can be atomic
 * type, Row, Pojo. Either an explicit composite type or an atomic type that is implicitly wrapped
 * into a row consisting of one field.
 *
 * <p>The following examples with pseudocode show how to write a stored procedure:
 *
 * <pre>{@code
 * // a stored procedure that try to rewrite data files for iceberg, it accept STRING
 * // and return an array of explicit ROW < STRING, STRING >.
 * class IcebergRewriteDataFilesProcedure implements Procedure {
 *   public @DataTypeHint("ROW< rewritten_data_files_count STRING, added_data_files_count STRING >")
 *          Row[] call(ProcedureContext procedureContext, String tableName) {
 *     // planning for scanning the table to do rewriting
 *     Table table = loadTable(tableName);
 *     List<CombinedScanTask> combinedScanTasks = planScanTask(table);
 *
 *     // now, rewrite the files according to the planning task
 *     StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
 *     DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
 *     RowDataRewriter rowDataRewriter =
 *         new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
 *     List<DataFile> addedDataFiles;
 *     try {
 *       addedDataFiles = rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
 *     } catch (Exception e) {
 *       throw new RuntimeException("Rewrite data file error.", e);
 *     }
 *
 *     // replace the current files
 *     List<DataFile> currentDataFiles = combinedScanTasks.stream()
 *             .flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
 *             .collect(Collectors.toList());
 *     replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);
 *
 *     // return the result for rewriting
 *     return new Row[] {Row.of(currentDataFiles.size(), addedDataFiles.size())};
 *   }
 * }
 *
 * // a stored procedure that accepts either LONG or STRING and
 * // return an array of STRING without without datatype hint.
 * class RollbackToSnapShotProcedure implements Procedure {
 *   public String[] call(ProcedureContext procedureContext, String tableName, Long snapshot) {
 *     Table table = loadTable(tableName);
 *     Long previousSnapShotId = table.currentSnapshot();
 *     table.manageSnapshots().rollbackTo(snapshotId).commit();
 *     return new String[] {
 *             "previous_snapshot_id: " + previousSnapShotId,
 *             "current_snapshot_id " + snapshot
 *     };
 *   }
 * }
 * }</pre>
 *
 * <p>In the API, a stored procedure can be used as follows:
 *
 * <pre>{@code
 * // for SQL users
 * TableEnvironment tEnv = ...
 * tEnv.executeSql("CALL rollback_to_snapshot('t', 1001)");
 * }</pre>
 */
@PublicEvolving
public interface Procedure {}


To make datatype hint available for Procedure , we propose annotation ProcedureHint  just like FunctionHint.

Code Block
languagejava
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ProcedureHints {
    ProcedureHint[] value();
}


Code Block
languagejava
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Repeatable(ProcedureHints.class)
public @interface ProcedureHint {

    DataTypeHint[] input() default @DataTypeHint();

    boolean isVarArgs() default false;

    String[] argumentNames() default {""};

    DataTypeHint output() default @DataTypeHint();
}


Finally, we propose to add an new interface to Catalog  to provide Procedure  according to name; The connector devopler can then return the actual procedure in this method.

Besides, we propose to add an new interface for catalog to list the procedures

Code Block
languagejava
@PublicEvolving
public interface Catalog {
    /**
     * Get the procedure. procedure name should be handled in a case insensitive way.
     *
     * @param procedurePath path of the procedure
     * @return the requested function
     * @throws ProcedureNotExistException if the function does not exist in the catalog
     * @throws CatalogException in case of any runtime exception
     */
   default Procedure getProcedure(ObjectPath procedurePath)
            throws ProcedureNotExistException, CatalogException {
      throw new UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass()));     
   }

   /**
     * List the names of all procedures in the given database. An empty list is returned if no procedure. 
Code Block
languagejava
@PublicEvolving
public interface Catalog {
    /**
     * Get the procedure. procedure name should be handled in a case insensitive way.
     *
     * @param procedurePathdbName pathname of the procedure
     * @return the requested functiondatabase.
     * @return a list of the names of the procedures in this database
     * @throws ProcedureNotExistExceptionDatabaseNotExistException if the functiondatabase does not exist in the catalog
     * @throws CatalogException in case of any runtime exception
     */
    default ProcedureList<String> getProcedurelistProcedures(ObjectPathString procedurePath)
            throws ProcedureNotExistExceptiondbName) throws DatabaseNotExistException, CatalogException {
         throw new UnsupportedOperationException(String.format("getProcedurelistProcedures is not implemented for %s.", this.getClass()));     
    }
 }


Code Block
languagejava
/** Exception for trying to operate on a procedure that doesn't exist. */
@PublicEvolving
public class ProcedureNotExistException extends Exception {
    private static final String MSG = "Procedure %s does not exist in Catalog %s.";

    public ProcedureNotExistException(String catalogName, ObjectPath functionPath) {
        this(catalogName, functionPath, null);
    }

    public ProcedureNotExistException(
            String catalogName, ObjectPath functionPath, Throwable cause) {
        super(String.format(MSG, functionPath.getFullName(), catalogName), cause);
    }
}

...

1:  In method FunctionCatalogOperatorTable#lookupOperatorOverloads, if the passed SqlFunctionCategory  is USER_DEFINED_PROCEDURE ,  get the procedure in Catalog  with the given SqlIdentifier,  and then wrap it to BridginSqlProcedure  which is intance of SqlFunction.

...

3: Add a class name SqlCallProcedureConverter which is used to convert the SqlNode for call procedure to CallProcedureOperation , in . In SqlCallProcedureConverter, if the SqlNode is instance of SqlBasicCall  and the operator is instance of SqlProcedureCallOperator, it should be call procedure statement, then convert it to CallProcedureOperation contains the procedure and the expression for arguments.

4: In method TableEnvironmentImpl#executeInternal, if the Operation  is intance of CallProcedureOperation , construnct a StreamTableEnvironmentProcedureContext, and pass it with the arguments specified by user to call the corresponding method procedure#call(xxx).

5: Get the result of procedure#call(xxx) , which should be CloseableIteratorT[]. Build a TableResult  with the result so that the SqlClient can show the result of the call procedure to users.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

N/A

Test Plan

UT & ITDescribe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?


Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.