Status
Current state: Under Discussion
Discussion thread: here (<- link to https://lists.apache.org/thread/7m5md150qgodgz1wllp5plx15j1nowx8)
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This proposal is a continuation of FLIP-79 in which Flink Function DDL is defined. Until now it is partially released as the flink function DDL with remote resources is not clearly discussed and implemented. It is an important feature for SQL platform engineers to scale their UDF management as the UDF can be released separately for different SQL users.
Proposed Change
Syntax
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS class_name [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR‘resource_path’ [, JAR ‘resource_path’]*]; |
The red part is what we will discuss in the proposal. This statement allow user to create a function that is implemented by the class_name. Jars which need to be added to the table environment can be specified with the USING clause; when the function is referenced for the first time by a Flink session, these resources will be added to the table environment as if ADD JAR had been issued.
Use Cases
Use Local Resource
CREATE TEMPORARY FUNCTION catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF' LANGUAGE JAVA USING JAR 'file:///xxx-udf/xxx-udf-1.0.1-20180502.011548-12.jar' |
Use Remote Resource
CREATE TEMPORARY FUNCTION catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF' LANGUAGE JAVA USING JAR 'hdfs:///xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar' |
New or Changed Public Interfaces
Execution Flow
The advanced ddl syntax allows the Flink SQL user or Table API user to use udf defined in local file or remote resource. Thus, it requires the class to be loaded correctly in the stages of calcite validation, code generation and distributed execution.
As shown in the diagram above, the class in remote resources needs to be loaded as a catalog function in the table environment for calcite sql validation and code generation, then the path needs to be registered into the execution environment, so that it can be used in distributed runtime. The deployment model plays an important role in setting the context of where the resource is. It could be in blobstore or just a path item in the classpaths of the job graph in a different deployment model. Thus, we may first introduce the API changes required on the left side in this section, then discuss the resource shipping in different deployment models.
Public API Changes
CatalogFunction
This API will provide getFunctionResources method which is used to get resource information of UDF, and deprecate the isGeneric method.
public interface CatalogFunction { /** * Get a detailed resource description of the function. * * @return an {@link ResourceUri} list of the function */ Optional<List<ResourceUri>> getFunctionResources(); /** * Distinguish if the function is a generic function. * * @deprecated This method is currently only used in hive to determine * if a function is a generic function. The behavior should be implemented * by hive itself, instead of providing a public api, so we deprecate it. * * @return whether the function is a generic function */ @Deprecated boolean isGeneric(); }
FunctionDefinitionFactory
For UDF function created with custom resources, a default method is provided here to load the class and get function definition using the user class loader.
public interface FunctionDefinitionFactory { /** * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}. If the * {@link CatalogFunction} is created by user defined resource, the user of * {@link FunctionDefinitionFactory} needs to override this method explicitly. * The implementation logic needs to use the user class loader to load custom classes * instead of the thread context class loader. * @param name name of the {@link CatalogFunction} * @param catalogFunction the catalog function * @param userClassLoader the class loader is used to load user defined function's class * @return */ default FunctionDefinition createFunctionDefinition( String name, CatalogFunction catalogFunction, ClassLoader userClassLoader) { if (catalogFunction.getFunctionResources().isPresent() && !CollectionUtil.isNullOrEmpty(catalogFunction.getFunctionResources().get())) { throw new UnsupportedOperationException( String.format("%s need to override default createFunctionDefinition for " + "loading user defined function class", this.getClass().getSimpleName())); } else { return createFunctionDefinition(name, catalogFunction); } } }
StreamExecutionEnvironment
To pass the resource path information from table environment to execution environment, we will use pipeline.jars option. Currently StreamExecutionEnvironment configure method doesn't override pipeline.jars option from dynamic configuration when generate Jobgraph, this will result in the user jar used in the query not being uploaded to the blobstore and a ClassNotFoundException will be thrown at distributed runtime, so the pipeline.jars option needs to be overridden in this method.
Implementation Plan
Supported Resource Type
As described in the use case section, the advanced function DDL is going to support both local resource and remote resource. Using local resource is very convenient for user to develop and debug jobs. In a production environment, user often need to reuse a resource across jobs or sessions. In addition, user basically submit jobs through the SQL development platform, and their local resources and development platform are not together, so using remote resources is a better way. The effort for these two types of resource are different. Thus, we will discuss them separately.
Local Resource
For local resource, the local path is added to the user class loader during job compilation phase to ensure that the corresponding class can be loaded during calcite validation, code generation. It is also added to the user jar path of JobGraph to distribute the resources to all machines in the cluster at distributed execution time.
Remote Resource
For remote resource there are different storage scheme such as HTTP, HDFS, S3, etc. During the compilation period of the query, it will first download the remote resource to a local temporary directory based on its path, and then add the local path to the user class loader. The next behavior is the same as local resource section.
If the remote resource is in HDFS, users need to configure the Hadoop environment on the machine which runs query first, and then add the Hadoop common related jars to the JVM classpath. We do not package Hadoop related dependencies into the uber jar of table-api-java module so as to avoid class conflict issues.
Note: Currently, ADD JAR
syntax only supports adding local resources. With the release of this advanced function DDL feature, ADD JAR
syntax also supports adding remote resources.
Supported Deployment Mode/Resource Type Mapping
Mode/Type | Local | Remote |
---|---|---|
Per Job Mode | ✔️ | ✔️ |
Session Mode | ✔️ | ✔️ |
Application Mode | ✔️ | ✔️ |
Limitation: For application mode, the use of local resources is special, user need to put the resources into flink usrlib folder before sumbit job, only then the resources are available when running query, please refer to Application Mode for details.
Proposed Design
UDF Registration Process
If the user uses CREATA FUNCTION ... USING JAR statement to register the UDF, the general process is as follows:
- First determine the type of resources used, if use local jar resources, the corresponding local path will be directly registered to the user class loader.
- If the resource used is a remote resource, it will first download the resource to a local temporary directory, which will be generated using UUID, and then register the local path to the user class loader.
- Using user class loader to load the corresponding class into JVM and verify the class.
- Creating UDF in Catalog and store the path information of UDF resources.
Referencing UDF Process
If the user uses a UDF registered by CREATA FUNCTION ... USING JAR statement in query, the general process is as follows:
- Parsing the SQL to find the UDF used in the query, if it is found that the UDF is registered with the Jar resource, first determine whether the Jar resource has been registered.
- If the Jar is not registered before, determine whether the Jar resource path is in the remote. If in the remote, will first download the Jar resource to the local temporary directory.
- Then the local path will be registered into the user class loader and loaded the class into JVM for calcite validation and code generation.
- Finally, when the job is submitted, the Jar resource is uploaded to the blobstore at the same time, so that it is available at distributed runtime.
Core Code Design
As already mentioned in FLINK-15635, we had a couple of class loading issues in the past because people forgot to use the right class loader in flink-table. The SQL Client executor code hacks a class loader into the planner process by using wrapClassLoader that sets the threads context class loader. Instead we should allow passing a class loader to environment settings. This class loader can be passed to the planner and can be stored in table environment, table config, etc. to have a consistent class loading behavior.
In addition, due to the problem of class loader, the current ADD JAR/REMOEV JAR syntax is implemented inside the SQL Client, we will move it to the table environment, so that the syntax is more general, for example Table API user can also use it.
In the current code implementation, the table environment holds a ClassLoader object, but this is not enough.If the REMOEV JAR syntax is used before using UDF, the corresponding Jar information will be removed from the class loader, which leads the old ClassLoader will be closed and a new ClassLoader will be constructed. As a result, ClassLoader object in other objects such as CatalogManager throughout the flink-table module will no longer be available because the latest ClassLoader object is not obtained.
Comprehensive consideration of the above, in this FLIP, the overall code design is as follows:
- Proposing an internal interface UserClassLoaderContext, this interface encapsulates the user class loader and the user's registered Jar resource. The user class loader contains all the registered Jar resources.
- The table environment directly refers UserClassLoaderContext instead of ClassLoader object, to ensure that when the ClassLoader object changes, the latest available ClassLoader can be obtained.
- The objects which need use the custom ClassLoader such as DataTypeFactoryImpl currently hold UserClassLoaderContext directly in entire table module, instead of holding ClassLoader before.
Having this in place should replace the need for Thread.currentThread().getContextClassLoader() in the entire flink-table module.
Migration Plan and Compatibility
It is a new feature for Function DDL, there is no migration needed.
Test Plan
- For local resource, changes will be verified by UT.
- For hdfs resource, changes will be verified by manually.
Rejected Alternatives
N/A