Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Create Function Statement

CREATE FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS class_name LANGUAGE lang_str [USING resource_path] [WITH PROPERTIES ‘(‘ name=value [, name=value]* ‘)’] 


Drop Function Statement

DROP FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;


Alter Function Statement

ALTER FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name RENAME TO new_name;


Show Function Statement

SHOW FUNCTION  [catalog_name.][db_name]


Use Cases

We want to use the function syntax to support all potential use cases. Below we list some obvious use cases that can be achieved.

Load UDF from Classpath

CREATE FUNCTION  catalog1.db1.geofence  AS ‘com.xxx.geo.GeoFenceUDF’ LANGUAGE java

DROP FUNCTION catalog1.db1.geofence

In this case, the assumption is that the UDF classes is already in classpath. Thus, we just need to get the class object by reflection, determine whether it is UDF, UDAF or UDTF,  and register it to TableEnvironment.

Load UDF from a remote resource

CREATE FUNCTION  catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF'  LANGUAGE java USING 'http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar'

In this case, the user can use a class that is not the local classpath. In the example above, the function NestedOutput is contained in a jar that is released to artifactory. Using this type of model, we can split the user level logic from the platform. Each team can write and own its own UDF library. A flink platform is just responsible to load it into classpath and use it.

Load python UDF from a remote resource

CREATE FUNCTION  catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF'  LANGUAGE python USING 'http://external.resources/flink-udf.py'


New or Changed Public Interfaces

In order to support loading external libraries and create UDFs from external libraries, we need to add a function in ExecutionEnvironment to register external libraries.

/**

 * Register a jar file to load in the Flink job dynamically. The jar file will be added into job graph before job   

 * submission. During runtime, the jar file is loaded into user code class loader automatically.

 * @param jarFile The path of the jar file (e.g., “file://a”, “hdfs://b”, “http://c”)

 */

Public void registerUserJarFile(String jarFile) {

    Path path = new Path(jarFile);

    this.userJars.add(path)

}

Before the job submission, register user jars will be added into StreamGraph, and then be added into JobGraph in the JobGraphGenerator. For different language support, the implementation will be different, here we just discuss the API needed for java/scala libs.

Implementation plan


From an implementation perspective, we want to provide function syntax align with multiple language support, but limit the scope only in java and scala. The python udd related support will be discussed and implemented in the scope of FLIP-78. The concrete action items include

  1. Add function related syntax in Flink SQL parser.
  2. Define SqlCreateFunction and SqlDropFunction in flink-sql-parser module
  3. Bridge DDL to register the function into Table environment
  4. Support UDF loading from an external resource for java
  5. Add scala function related support


As FLIP-65  New type inference for Table API UDFs is a blocker for adding scala function into TableEnvImpl. 1), 2) and 3) will only support language java. 4) is for adding function into table environment with remote resources.  Once the FLIP-65 is done, we can continue the work of supporting language Scala, and corresponding function registration into TableEnvImpl.

Migration Plan and Compatibility

It is a new feature for Flink DDL, there is no migration needed.

Rejected Alternatives

The function DDL syntax considered about existing requirements. No rejected alternatives yet.

References 

[1] Flink SQL DDL Design  

[2] FLIP-69 Flink SQL DDL Enhancement

[3] FLIP-64 Support for Temporary Objects in Table module

[4] FLIP-65 New type inference for Table API UDFs

[5] FLIP-78 Flink Python UDF Environment and Dependency Management