You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Stored procedure provides a convenient way to encapsulate complex logic to perform data manipulation or administrative tasks in external storage systems.  It's widely used in traditional databases and popular compute engines like Trino for it's convenience. 

However, Flink currently does not support calling stored procedure directly, which limits its ability to intergrate with external systems and makes it harder for users to access/manage their data in external systems with Flink.

Therefore, we propose adding support for call stored procedure in Flink to enable better integration with external storage systems. This will allow users access/manage their data  smoothly with just executing one call statement in Flink and bring users more efficient data processing workflows.

With this FLIP,  Flink will allow connector developers to develop their own built-in stored procedures, and then enables users to call these predefiend stored procedures .

Note: In this FLIP, we don't intend to allow users to customize their own stored procedure  for we don't want to expose too much to users too early.


Public Interfaces

Syntax

We propose the following synatax to call a stored procedure:

CALL [catalog_name.][database_name.]procedure_name ([ expression [, expression]* ] )

Note: The expression  can be a function call, but should be reduced  to literal value. Otherwise, it'll throw an exception.

Public interfaces changes

First, we propose a new interface Procedure for stored procedure. The stored procedure providers are expected to implement Procedure  to define their own procedure.

@PublicEvolving
public interface Procedure {}


To make , we propose annotation ProcedureHint  just like FunctionHint.

@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ProcedureHints {
    ProcedureHint[] value();
}


@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Repeatable(ProcedureHints.class)
public @interface ProcedureHint {

    DataTypeHint[] input() default @DataTypeHint();

    boolean isVarArgs() default false;

    String[] argumentNames() default {""};

    DataTypeHint output() default @DataTypeHint();
}


Then, we propose to add an new interface to Catalog  to provide Procedure  according to name; The connector devopler can then return the actual procedure in this method.

@PublicEvolving
public interface Catalog {
    /**
     * Get the procedure. procedure name should be handled in a case insensitive way.
     *
     * @param procedurePath path of the procedure
     * @return the requested function
     * @throws ProcedureNotExistException if the function does not exist in the catalog
     * @throws CatalogException in case of any runtime exception
     */
    default Procedure getProcedure(ObjectPath procedurePath)
            throws ProcedureNotExistException, CatalogException {
        throw new TableException("Can't get procedure.!");
    }
}


/** Exception for trying to operate on a procedure that doesn't exist. */
@PublicEvolving
public class ProcedureNotExistException extends Exception {
    private static final String MSG = "Procedure %s does not exist in Catalog %s.";

    public ProcedureNotExistException(String catalogName, ObjectPath functionPath) {
        this(catalogName, functionPath, null);
    }

    public ProcedureNotExistException(
            String catalogName, ObjectPath functionPath, Throwable cause) {
        super(String.format(MSG, functionPath.getFullName(), catalogName), cause);
    }
}


Proposed Changes

1:  In method FunctionCatalogOperatorTable#lookupOperatorOverloads, if 

2:

3:

4:

5:

Examples for stored procedure


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels