Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This proposal is a continuation of FLIP-79 in which Flink Function DDL is defined. Until now it is partially released as the flink function DDL with remote resources is not clearly discussed and implemented. It is an important feature for SQL platform engineers to scale their udf UDF management as the udf UDF can be released separately for different SQL users.

...

Code Block
languagejava
public interface FunctionDefinitionFactory {
     /**
     * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}. If the
     * {@link CatalogFunction} is created by user defined resource, the user of
     * {@link FunctionDefinitionFactory} needs to override this method explicitly.
     * The implementation logic needs to use the user classloaderclass loader to load custom classes
     * instead of the thread context class classloaderloader.
     * @param name name of the {@link CatalogFunction}
     * @param catalogFunction the catalog function
     * @param userClassLoader the class loader is used to load user defined function's class
     * @return
     */
    default FunctionDefinition createFunctionDefinition(
            String name,
            CatalogFunction catalogFunction,
            ClassLoader userClassLoader) {
        if (catalogFunction.getFunctionResources().isPresent() 
                && !CollectionUtil.isNullOrEmpty(catalogFunction.getFunctionResources().get())) {
            throw new UnsupportedOperationException(
                    String.format("%s need to override default createFunctionDefinition for " 
                            + "loading user defined function class", this.getClass().getSimpleName()));
        } else {
            return createFunctionDefinition(name, catalogFunction);
        }
    }
}

StreamExecutionEnvironment

To pass the resource path information from table environment to execution environment, we will use pipeline.jars option. Currently StreamExecutionEnvironment configure method doesn't override pipeline.jars option from dynamic configuration when generate Jobgraph, this will result in the user jar used in the query not being uploaded to the blobstore and a ClassNotFoundException will be thrown at distributed runtime, so the pipeline.jars option needs to be overridden in this method.

...

As described in the use case section, the advanced function DDL is going to support both local resources and resource and remote resourcesresource. Using local resource is very convenient for user to develop and debug jobs. In a production environment, user often need to reuse a resource across jobs or sessions. In addition, user basically submit jobs through the SQL development platform, and their local resources and development platform are not together, so using remote resources is a better way. The effort for these two types of resource are different. Thus, we will discuss them separately.

Local Resource

For local resourcesresource, the local path is added to the user classloader class loader during job compilation phase to ensure that the corresponding class can be loaded during calcite validation, code generation. It is also added to the user jar path of JobGraph to distribute the resources to all machines in the cluster at distributed execution time.

Remote Resource

For remote resources resource there are different storage scheme such as HTTP, HDFS, S3, etc. During the compilation period of the query, it will first download the remote resource to a local temporary directory based on its path, and then add the local path to the user classloaderclass loader. The next behavior is the same as local resources resource section.

If the remote resource is in HDFS, users need to configure the Hadoop environment on the machine which runs query first, and then add the Hadoop common related jars to the JVM classpath. We do not package Hadoop related dependencies into the uber jar of table-api-java module so as to avoid class conflict issues.

...

  1. First determine the type of resources used, if use local jar resources, the corresponding local path will be directly registered to the user class loader.
  2. If the resource used is a remote resource, it will first download the resource to a local temporary directory, which will be generated using UUID, and then register the local path to the user class loader.
  3. Using user class loader to load the corresponding class into JVM and verify the class.
  4. Creating UDF in Catalog and store the path inforamtion information of UDF resources.

Referencing UDF Process

...

  1. Parsing the SQL to find the UDF used in the query, if it is found that the UDF is registered with the Jar resource, first determine whether the Jar resource has been registered.
  2. If the Jar is not registered before, determine whether the Jar resource path is in the remote. If in the remote, will first download the Jar resource to the local temporary directory.
  3. Then the local path will be registered into the user class cloader loader and loaded the class into JVM for calcite validation and code generation.
  4. Finally, when the job is submitted, the Jar resource is uploaded to the blobstore at the same time, so that it is available at distribted distributed runtime.

Core Code Design

As already mentioned in FLINK-15635, we had a couple of class loading issues in the past because people forgot to use the right classloader class loader in flink-table. The SQL Client executor code hacks a classloader class loader into the planner process by using wrapClassLoader that sets the threads context classloaderclass loader. Instead we should allow passing a class loader to environment settings. This class loader can be passed to the planner and can be stored in table environment, table config, etc. to have a consistent class loading behavior.

In addition, due to the problem of classloaderclass loader, the current ADD JAR/REMOEV JAR syntax is implemented inside the SQL Client, we will move it to the table environment, so that the syntax is more general, for example Table API user can also use it.

...

  1. Proposing an internal interface UserClassLoaderContext, this interface encapsulates the user class loader and the user's registered Jar resource. The user class loader contains all the registered Jar resources.
  2. The table environment directly refers UserClassLoaderContext instead of ClassLoader object, to ensure that when the ClassLoader object changes, the latest available ClassLoader can be abtained obtained.
  3. The objects which need use the custom ClassLoader such as DataTypeFactoryImpl currently hold UserClassLoaderContext directly in entire table module, instead of holding ClassLoader before.

...