Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. External lib registration. The requirements come from the hive integration that enhances the adoption of Flink batch. HQL supports syntax like:

    CREATE FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'


  2. Language distinguishdistinction. Due to bytecode language specifics in Scala, there are some limitations to extract type information from scala function. At the same time, support python UDF in table runtime is another ongoing effort. Thus, the SQL syntax needs to consider support supporting multiple languages. Mysql create function syntax support language in this way:

    CREATE FUNCTION hello (s CHAR(20))RETURNS CHAR(50)DETERMINISTIC RETURN CONCAT('Hello, ',s,'!') LANGUAGE SQL 
  3. Properties Definition. Similar with properties for table, we can add properties for function. The motivation of it is to make the UDF definition more extendable for future works.

  4. Temporary Function Support. FLIP-57 propose to distinguish temporary and non-temporary functions for both catalog and system. As Temporary function will be registered only for current session. It requires a flag from DDL to distinguish the function resolution order.

     CREATE TEMPORARY FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'


  5. Function Qualifier. Functions

    Temporary Object Scope.Temporary functions

    identifiers resolution consider object scopes whether in particular catalog, database or just

    concurrent

    current catalog and database. Thus, all of the function DDL needs to support 3-part path.

    CREATE FUNCTION catalog1.addfunc AS 'com.example.hiveserver2.udf.add' LANGUAGE JVM


Function DDL SQL Syntax

Create Function Statement

CREATE CREATE [TEMPORARY|SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS class_name LANGUAGE lang_str [USING resource_path] [WITH PROPERTIES ‘(‘ name=value [, name=value]* ‘)’]identifier [LANGUAGE JVM|PYTHON] [USING JAR|FILE|ARCHIVE ‘resource_path’ [, USING JAR|FILE|ARCHIVE ‘path’]*]; 


Drop Function Statement

DROP DROP [TEMPORARY|SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;


Alter Function Statement

ALTER ALTER [TEMPORARY|SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name RENAME TO new_name;

...

Load UDF from Classpath

CREATE FUNCTION  TEMPORARY FUNCTION  catalog1.db1.geofence  AS ‘com.xxx.geo.GeoFenceUDF’ LANGUAGE java’JVM’

DROP FUNCTION catalog1.db1.geofence

In this case, the assumption is that the UDF classes is are already in the classpath. Thus, we just need to get the class object by reflection, determine whether it is UDF, UDAF or UDTF,  and register it to TableEnvironment.

...

CREATE FUNCTION  catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF'  LANGUAGE java LANGUAGE  JVM USING 'http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar'

In n this case, the user can use a class that is not the local classpath. In the example above, the function NestedOutput is contained in a jar that is released to artifactory. Using this type of model, we can split the user level logic from the platform. Each team can write and own its own UDF library. A flink Flink platform is just responsible to load it into classpath and use it. We will discuss how to achieve it in the later section. Basically, the resource URL will be added as a user library in the execution environment. It will be added into a job graph, and ship to the storage layer, such as hdfs before job submission.

Load python UDF from a remote resource

CREATE FUNCTION  catalog1.db1.NestedOutput AS 'com.xxx.udf.NestedOutputUDF'  LANGUAGE python 'PYTHON' USING 'http://external.resources/flink-udf.py'

...

New or Changed Public Interfaces

The first change needed is to add more functions in CatalogFunction interface.

public interface CatalogFunction {

  String getClassName();

  Enum getLanguage();  // TODO

  Map<String, String> getProperties();

  CatalogFunction copy();

  Optional<List<String>> getResourcePaths();  // TODO

  Optional<String> getDescription();

  Optional<String> getDetailedDescription();

}

In order to support loading external libraries and create UDFs from external libraries, we need to add a function in ExecutionEnvironment to register external libraries.

...

Before the job submission, register user jars will be added into StreamGraph, and then be added into JobGraph in the JobGraphGenerator

To consider the isolation of class loading of different session, we can use Add a new interface in {Stream}ExecutionEnvironment. 

           registerUserJarFiles(name, jarFiles...) 

The interface register a set of Jar files with key name. Internally, it uses similar path as registerCachedFile(), which distributes the Jar files to runtime using Flink’s Blob Server. 

Also, add a new interface in RuntimeContext to create and cache a custom userCodeClassLoader using the Jar file set registered under name. 

          getClassLoaderWithName(name)

During code generation of the UDF function call, it will load the set of Jar files that are associated with the library into a custom classLoader, and invoke the function reflectively.

Also, inside RuntimeContext implementation, we will keep a cached of all loaded custom classLoader so we wont load the same library multiple times.

For different language support, the implementation will be different, here we just discuss the API needed for java/scala libs.

...

From an implementation perspective, we want to provide function syntax align with multiple language support, but limit the scope only in java and scala. The python udd related support will be discussed and implemented in the scope of FLIP-78. The concrete action items include

Flink 1.10 Release

  1. Add function related syntax in Flink SQL parser.
  2. Define SqlCreateFunction and SqlDropFunction in flink-sql-parser module
  3. Bridge DDL to register the function into Table environment

After Flink 1.10

  1. Support UDF loading from an external resource for java
  2. Add scala function related support

...