Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors

Peter Huang, Rong Rong, Bowen Li, Shuyi Chen

Status

Current state: 1.10 part - Accepted, the other part is still under discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discussion-FLIP-79-Flink-Function-DDL-Support-td33965.html

JIRAhttps://issues.apache.org/jira/browse/FLINK-7151

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-7151

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This proposal aims to support function DDL with the consideration of SQL syntax, language compliance, and advanced external UDF lib registration.

...

After some discussion with the community, we found that there are several ongoing efforts, such as FLIP-64 [3], FLIP-65 [4], and FLIP-78 [5]. As they will directly impact the SQL syntax of function DDL, the proposal wants to describe the problem clearly with the consideration of existing works and make sure the design aligns with efforts of API change of temporary objects and type inference for UDF defined by different languages.

Proposed Changes

Requirements

Before deep into the DDL SQL, we want to discuss the major requirements for defining a function within Flink runtime by related FLIPs:

...

CREATE FUNCTION catalog1.addfunc AS 'com.example.hiveserver2.udf.add' LANGUAGE JVMJAVA


Function DDL Syntax

We propose the following as the function DDL syntax:

Create Function Statement

CREATE [TEMPORARY| TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR|FILE|ARCHIVE ‘resource_path’ [, USING JAR|FILE|ARCHIVE ‘path’]*]; 

Drop Function Statement

DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;

Alter Function Statement

ALTER [TEMPORARY| TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name RENAME TO new_name;

Show Function Statement

SHOW FUNCTION  [catalog_name.][db_name]

...

We currently only support java/scala/python. Both java and scala run in JVM. Technically, JVM and python are enough to distinguish two runtimes in Flink. But JVM and python are conceptually in different domains as JVM is runtime and python is language. Thus, we distinguished JAVA and SCALA in DDL syntax.

Use Cases

We want to use the function syntax to support all potential use cases. Below we list some obvious use cases that can be achieved.

Load UDF from Classpath

CREATE TEMPORARY FUNCTION  catalog1.db1.func1  AS ‘com.xxx.udf.func1UDF’ LANGUAGE ’JVM’’JAVA’

DROP FUNCTION catalog1.db1.geofence

In this case, the assumption is that the UDF classes are already in the Classpath. Thus, we just need to get the class object by reflection, determine whether it is UDF, UDAF or UDTF,  and register it to TableEnvironment.

Load UDF from a remote resource

CREATE FUNCTION  catalog1.db1.func2 AS 'com.xxx.udf.func2UDF'  LANGUAGE  JVM USING JAVA USING 'http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar'

...

Using this type of model, we can split the user level logic from the platform. Each team can write and own its own UDF library. A Flink platform is just responsible to load it into Classpath and use it. We will discuss how to achieve it in the later section. Basically, the resource URL will be added as a user library in the execution environment. It will be added into a job graph, and ship to the storage layer, such as HDFS before job submission.

Load python UDF from a remote resource

CREATE FUNCTION  catalog1.db1.func3 AS 'com.xxx.udf.func3UDF'  LANGUAGE 'PYTHON' USING 'http://external.resources/flink-udf.py'


New or Changed Public Interfaces

The first change needed is to add more functions in CatalogFunction interface.

...

Before the job submission, register user jars will be added into StreamGraph, and then be added into JobGraph in the JobGraphGenerator. 

Resource Isolation

To consider the isolation of class loading of different session, we can add a new interface in {Stream}ExecutionEnvironment. Such as:

...

Also, inside RuntimeContext implementation, we will keep a cached of all loaded custom classLoader so we wont load the same library multiple times. 

Implementation plan

Note: For different language support, the implementation will be different, here we just discuss the API needed for java/scala libs.

From an implementation perspective, we want to provide function syntax align with multiple language support, but limit the scope only in java and scala. The python udd related support will be discussed and implemented in the scope of FLIP-78. The concrete action items include

Flink 1.10 Release

In the Flink 1.10 release, we will focus on the basic function DDL syntax as below:

Create Function Statement

CREATE  [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

Drop Function Statement

DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;

Alter Function Statement

ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON];

Show Function Statement

SHOW FUNCTIONS  [catalog_name.][db_name]

...

  1. Add function related syntax in Flink SQL parser.
  2. Define SqlCreateFunction and SqlDropFunction in flink-sql-parser module
  3. Add changes to CatalogFunction
  4. Bridge DDL to register the function into Table environment and catalogs

After Flink 1.10

  1. Support UDF loading from an external resource for java
  2. Add scala function related support

...

into the table environment with remote resources.  Once the FLIP-65 is done, we can continue the work of supporting language Scala, and corresponding function registration into TableEnvImpl.

Migration Plan and Compatibility

It is a new feature for Flink DDL, there is no migration needed.

Rejected Alternatives

The function DDL syntax considered about existing requirements. No rejected alternatives yet.

References 

[1] Flink SQL DDL Design  

...