Status
Current state:
...
Discarded (FLIP process not followed)
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14055
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This proposal is a continuation of FLIP-79 in which Flink Function DDL is defined. Until now it is partially released as the flink function DDL with remote resources is not clearly discussed and implemented. It is an important feature for SQL platform engineers to scale their udf management as the udf can be released separately for different SQL users.
...
Create Function Statement
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR|FILE|ARCHIVE ‘resource_path’ [, USING JAR|FILE|ARCHIVE ‘path’]*]; |
The red part is what we will discuss in the proposal.
Use Cases
Use Remote Resource
CREATE TEMPORARY FUNCTION catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF' LANGUAGE JAVA USING JAR 'http://artifactory.com/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar' |
Use Local Resource
CREATE TEMPORARY FUNCTION catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF' LANGUAGE JAVA USING JAR 'file:///xxx-udf-1.0.1-20180502.011548-12.jar' |
New or Changed Public Interfaces
...
Public API Changes
Catalog Function
public interface CatalogFunction {
Optional<List<String>> getResourcePaths(); //To be add } |
User-Defined Function
As the classloader is used in both UDF initialization and also code generation context for code reuse. We may consider to use UserDefinedFunction to pass the user classloader.
public abstract class UserDefinedFunction { private transient ClassLoader classLoader; public ClassLoader getClassLoader(); public void setClassLoader(ClassLoader classLoader); } |
Table Environment
To pass the resource path information from table environment to execution environment, we need to provide an extra interface in table environment. With this, the path info can be added into the cache files of the execution environment when a catalog function is created.
public interface TableEnvironment { // Register udf library into execution environment public void registerExternalibrary(String name, String path); } |
Job Graph
For udf jar with remote path, we can only use the blob files and classpaths in the job graph will be loaded in the classloader of Task in distributed runtime. We need to be able to add the path into either of these two fields. But for some of deployment modes (for example application mode), there is no blob file upload process as per job mode. Thus, It will be easier to just open the interface for
public class JobGraph { // Add remote udf resource path in job graph public void addClasspath(URL path); } |
Implementation Plan
As described in the use case section, the advanced function DDL is going to support both remote files and local files. The efforts for these two types of files are different. Thus, we will discuss them separately.
Remote Resource
hdfs | s3 | http | |
Yarn | |||
K8 | |||
Standalone |
Local Resource
As the solution of shipping, local files vary in different deploy modes.
...