Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Flink DDL is initialized and discussed in the design design [1] by  by Shuyi Chen and Timo, et. al. As the initial discussion mainly focused on the table, type and view. FLIP-69 [2] extend it with a more detailed discussion of DDL for catalog, database, and function. Original the function DDL was under the scope of FLIP-69.

...

public interface CatalogFunction {

  String getClassName();

  Enum getLanguage();  // TODO

  Map<String, String> getProperties();

  CatalogFunction copy();

  Optional<List<String>> getResourcePaths();  // TODO

  Optional<String> getDescription();

  Optional<String> getDetailedDescription();

}


The second change is to register user Jar: In order to support loading external libraries and create UDFs from external libraries, we need to add a function in ExecutionEnvironment to register external libraries.

/**

 * Register a jar file to load in the Flink job dynamically. The jar file will be added into job graph before job   

 * submission. During runtime, the jar file is loaded into user code class loader automatically.

 *

 * @param jarFile The path of the jar file (e.g., “file://a”, “hdfs://b”, “http://c”)

 */

Public void registerUserJarFile(String jarFile) {

    Path path = new Path(jarFile);

    this.userJars.add(path)

}

Before the job submission, register user jars will be added into StreamGraph, and then be added into JobGraph in the JobGraphGenerator. 

Resource Isolation

To consider the isolation of class loading of different session, we can use Add add a new interface in {Stream}ExecutionEnvironment.             registerUserJarFiles(name, jarFilesSuch as:

Public void registerUserJarFiles(String classloaderName, String... jarFiles{

  // ...

}

The interface register a set of Jar files with key namea specific Classloader environment key: classloaderName. Internally, it uses similar path as registerCachedFile(), which distributes the Jar files to runtime using Flink’s Blob Server. 

Also, add a new interface in RuntimeContext to create and cache a custom userCodeClassLoader using the Jar file set registered under name

          getClassLoaderWithName(name)

Public ClassLoader getClassLoaderByName(String classloaderName) {

  // ...

}

During code generation of the UDF function call, it will load the set of Jar files that are associated with the library into a custom classLoader, and invoke the function reflectively.

Also, inside RuntimeContext implementation, we will keep a cached of all loaded custom classLoader so we wont load the same library multiple times. 

Implementation plan

Note: For different language support, the implementation will be different, here we just discuss the API needed for java/scala libs.

Implementation plan

From an implementation perspective, we want to provide function syntax align with multiple language support, but limit the scope only in java and scala. The python udd related support will be discussed and implemented in the scope of FLIP-78. The concrete action items include

...