Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Flink DDL is initialized and discussed in the design [1] by Shuyi Chen and Timo. As the initial discussion mainly focused on the table, type and view. FLIP-69 [2] extend it with a more detailed discussion of DDL for catalog, database, and function. Original the function DDL was under the scope of FLIP-69.

After some discussion with the community, we found that there are several ongoing efforts, such as FLIP-64 [3], FLIP-65 [4], and FLIP-78 [5]. As they will directly impact the SQL syntax of function DDL, the proposal wants to describe the problem clearly with the consideration of existing works and make sure the design aligns with efforts of API change of temporary objects and type inference for UDF defined by different languages.

...

Before deep into the DDL SQL, we want to discuss the major requirements for defining a function within Flink runtime by related FLIPs:

  • External lib registration. The : The requirements come from the hive integration that enhances the adoption of Flink batch. HQL supports syntax like:
CREATE FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'
  • Language distinction. Due Distinction: Due to bytecode language specifics in Scala, there are some limitations to extract type information from scala function. 

...

  • At the same time, support python UDF in table runtime is another ongoing effort. Thus, the SQL syntax needs to consider supporting multiple languages.

...

  • For example: Mysql create function syntax support language in this way: 

CREATE FUNCTION hello (s CHAR(20))RETURNS CHAR(50)DETERMINISTIC RETURN CONCAT('Hello, ',s,'!') LANGUAGE SQL

  • Temporary Function Support. FLIP: FLIP-57 proposes to distinguish temporary and non-temporary functions for both catalog and system. 

...

  • As a temporary function will be registered only for the current session. It requires a flag from DDL to distinguish the function resolution order.
 CREATE TEMPORARY FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'
  • Function Qualifier. : Functions identifiers resolution consider object scopes whether in particular catalog, database or just current catalog and database. 

...

  • Thus, all of the function DDL needs to support 3-part path.

CREATE FUNCTION catalog1.addfunc AS 'com.example.hiveserver2.udf.add' LANGUAGE JVM


Function DDL

...

Syntax

We propose the following as the function DDL syntax:

Create Function Statement

CREATE [TEMPORARY|SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JVM|PYTHON] [USING JAR|FILE|ARCHIVE ‘resource_path’ [, USING JAR|FILE|ARCHIVE ‘path’]*]; 

Drop Function Statement

DROP [TEMPORARY|SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;

Alter Function Statement

ALTER [TEMPORARY|SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name RENAME TO new_name;

Show Function Statement

SHOW FUNCTION  [catalog_name.][db_name]


Use Cases

We want to use the function syntax to support all potential use cases. Below we list some obvious use cases that can be achieved.

Load UDF from Classpath

CREATE TEMPORARY FUNCTION  catalog1.db1.

geofence 

func1  AS ‘com.xxx.

geo

udf.

GeoFenceUDF’

func1UDF’ LANGUAGE ’JVM’

DROP FUNCTION catalog1.db1.geofence

In this case, the assumption is that the UDF classes are already in the classpathClasspath. Thus, we just need to get the class object by reflection, determine whether it is UDF, UDAF or UDTF,  and register it to TableEnvironment.

Load UDF from a remote resource

CREATE FUNCTION  catalog1.db1.
NestedOutput
func2 AS 'com.xxx.udf.
NestedOutputUDF
func2UDF'  LANGUAGE  JVM USING 'http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar'

n In this case, the user can use a class that is not the local classpathClasspath. In the example above, the function NestedOutput is contained in a jar that is released to artifactoryArtifactory.

Using this type of model, we can split the user level logic from the platform. Each team can write and own its own UDF library. A Flink platform is just responsible to load it into classpath Classpath and use it. We will discuss how to achieve it in the later section. Basically, the resource URL will be added as a user library in the execution environment. It will be added into a job graph, and ship to the storage layer, such as hdfs HDFS before job submission.

Load python UDF from a remote resource

CREATE FUNCTION  catalog1.db1.
NestedOutput
func3 AS 'com.xxx.udf.
NestedOutputUDF
func3UDF'  LANGUAGE 'PYTHON' USING 'http://external.resources/flink-udf.py'


New or Changed Public Interfaces

The first change needed is to add more functions in CatalogFunction interface.

public interface CatalogFunction {

  String getClassName();

  Enum getLanguage();  // TODO

  Map<String, String> getProperties();

  CatalogFunction copy();

  Optional<List<String>> getResourcePaths();  // TODO

  Optional<String> getDescription();

  Optional<String> getDetailedDescription();

}

In order to support loading external libraries and create UDFs from external libraries, we need to add a function in ExecutionEnvironment to register external libraries.

/**

 * Register a jar file to load in the Flink job dynamically. The jar file will be added into job graph before job   

 * submission. During runtime, the jar file is loaded into user code class loader automatically.

 * @param jarFile The path of the jar file (e.g., “file://a”, “hdfs://b”, “http://c”)

 */

Public void registerUserJarFile(String jarFile) {

    Path path = new Path(jarFile);

    this.userJars.add(path)

}

Before the job submission, register user jars will be added into StreamGraph, and then be added into JobGraph in the JobGraphGenerator. 

...