ReasonFLIP process not followed.

Status

Current state: Discarded

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA:  Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

This proposal is a continuation of FLIP-79 in which Flink Function DDL is defined. Until now it is partially released as the flink function DDL with remote resources is not clearly discussed and implemented. It is an important feature for SQL platform engineers to scale their udf management as the udf can be released separately for different SQL users.

Proposed Change

Syntax

Create Function Statement

CREATE  [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR|FILE|ARCHIVE ‘resource_path’ [, USING JAR|FILE|ARCHIVE ‘path’]*];

The red part is what we will discuss in the proposal.

Use Cases

Use Remote Resource

CREATE TEMPORARY FUNCTION  catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF' LANGUAGE JAVA USING JAR 'http://artifactory.com/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar'


Use Local Resource

CREATE TEMPORARY FUNCTION  catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF' LANGUAGE JAVA USING JAR 'file:///xxx-udf-1.0.1-20180502.011548-12.jar'

New or Changed Public Interfaces

Execution Flow

The advanced ddl syntax allows the Flink SQL user or Table API user to use udf defined in local file or remote resource in http/hdfs. Thus, it requires the class to be loaded correctly in the stages of calcite validation, code generation, and distributed execution.  


                                   SQL Compile                                                                                                                         Distributed Execution

As shown in the diagram above, the class in remote resources needs to be loaded as a catalog function in the table environment for calcite sql validation and code generation, then the path needs to be registered into the execution environment so that it can be used in distributed runtime. The deployment model plays an important role in setting the context of where the resource is. It could be in blob-store or just a path item in the classpaths of the job graph in a different deployment model. Thus, we may first introduce the API changes required on the left side in this section, then discuss the resource shipping in different deployment models case by case in implementation sections. 

Public API Changes

Catalog Function

public interface CatalogFunction {

  

  Optional<List<String>> getResourcePaths(); //To be add

}


User-Defined Function

As the classloader is used in both UDF initialization and also code generation context for code reuse. We may consider to use UserDefinedFunction to pass the user classloader. 

public abstract class UserDefinedFunction {

     private transient ClassLoader classLoader;

     public ClassLoader getClassLoader();

     public void setClassLoader(ClassLoader classLoader);

}

Table Environment

To pass the resource path information from table environment to execution environment, we need to provide an extra interface in table environment. With this, the path info can be added into the cache files of the execution environment when a catalog function is created.

public interface TableEnvironment {

     // Register udf library into execution environment

     public void  registerExternalibrary(String name, String path);

}


Job Graph

For udf jar with remote path, we can only use the blob files and classpaths in the job graph will be loaded in the classloader of Task in distributed runtime.  We need to be able to add the path into either of these two fields. But for some of deployment modes (for example application mode), there is no blob file upload process as per job mode. Thus, It will be easier to just open the interface for 

public class JobGraph {

     // Add remote udf resource path in job graph

     public void addClasspath(URL path);

}




Implementation Plan

As described in the use case section, the advanced function DDL is going to support both remote files and local files. The efforts for these two types of files are different. Thus, we will discuss them separately. 

Remote Resource



hdfs

s3

http

Yarn




K8




Standalone




Local Resource

As the solution of shipping, local files vary in different deploy modes

Per Job Mode


Session Mode


Application Mode