Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors

Peter Huang, Rong Rong, Bowen Li, Shuyi Chen

Status

Current state: "Under Discussion"

...

To consider the isolation of class loading of different session, we can add a new interface in {Stream}ExecutionEnvironment. Such as:

Public void registerUserJarFiles(String classloaderName, String... jarFiles) {

  // ...

}

The interface register a set of Jar files with a specific Classloader environment key: classloaderName. Internally, it uses similar path as registerCachedFile(), which distributes the Jar files to runtime using Flink’s Blob Server. 

Also, add a new interface in RuntimeContext to create and cache a custom userCodeClassLoader using the Jar file set registered under name

Public ClassLoader getClassLoaderByName(String classloaderName) {

  // ...

}

During code generation of the UDF function call, it will load the set of Jar files that are associated with the library into a custom classLoader, and invoke the function reflectively.

...

From an implementation perspective, we want to provide function syntax align with multiple language support, but limit the scope only in java and scala. The python udd related support will be discussed and implemented in the scope of FLIP-78. The concrete action items include

Flink 1.10 Release

In the Flink 1.10 release, we will focus on the basic function DDL syntax as below:

Create Function Statement

CREATE  [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JVM|PYTHON]

Drop Function Statement

DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;

Basically, we will delivery create function and drop function that is already included in the classpath. For loading function from remote resources, it will be work after the Flink 1.10 release. The sub-tasks include.

  1. Add function related syntax in Flink SQL parser.
  2. Define SqlCreateFunction and SqlDropFunction in flink-sql-parser module
  3. Add changes to CatalogFunction
  4. Bridge DDL to register the function into Table environment and catalogs

After Flink 1.10

  1. Support UDF loading from an external resource for java
  2. Add scala function related support

...