Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Python UDF(FLIP-58) has already been introduced in the release of 1.10.0. However, only Python Table API users could use them.

The SQL Function DDL(FLIP-79) is a great feature which was also introduced in the release of 1.10.0, however, it currently only supports creating Java/Scala UDF in the SQL Function DDL. Although FLIP-79 has already proposed a statement about how to create Python UDF in the SQL Function DDL, it’s still not supported yet.

We want to introduce the support of Python UDF in the SQL Function DDL to fill this gap. It provides another way of using Python UDF and could extend Python UDF to users of Java/Scala Table API, SQL client, etc.

Non-Goals

  • As registering Python UDF to catalog is not supported yet, this FLIP will not support registering Python UDFs as catalog function.

Proposed Changes

Syntax for Python UDF in SQL function DDL

The syntax of creating Python Function in SQL function DDL has already been proposed in FLIP-79 which is as followings:

CREATE [TEMPORARY | TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON]


It has already supported "JAVA" and "SCALA" in release 1.10.0 in the "LANGUAGE" part and we just need to add the support of "PYTHON" and so there are no new interfaces introduced regarding to the syntax, e.g.:

CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'pymodule.udf.func1' LANGUAGE PYTHON;

Python UDF Dependency Management Configurations

Since Python UDF can be used in Java jobs through SQL DDL, it is also necessary to support Python dependency management in Java jobs. The following configurations will be introduced:

Configure

Example

Description

python.files

/home/my/test.py,hdfs:///test2.zip

Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma (',') could be used as the separator to specify multiple files. The option is equivalent to the command line option "-pyfs". 

python.requirements

requirements.txt#/home/my/downloaded_cache

Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists. The option is equivalent to the command line option "-pyreq".

python.archives

/home/my/py37.zip,data.zip#data

Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory is specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF. The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r'). The option is equivalent to the command line option "-pyarch".

python.executable

Example 1:
/usr/bin/python


Example 2:

py37.zip/py37/bin/python

(if py37.zip is uploaded using 'python.archives')

Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".

python.client.executable

/usr/bin/python

The python interpreter used to launch the local python process when compiling the jobs containing Python UDFs.

Environment Variable

We will introduce an environment variable PYFLINK_CLIENT_EXECUTABLE which has the same functionality as the configuration "python.client.executable". 

Detailed Design

High Level Sequence Diagram

How to process the Python UDF in the DDL

For the Python UDF in the SQL Function DDL, the FunctionCatalog will store the fully qualified name of the Python UDF.  When compiling, it will create a Java UserDefinedFunction wrapper for the corresponding Python UDF. A Python process will be launched to provide the necessary information to the creation of the wrapper, such as the input types, result type, determinism etc.

Python UDF Dependency Management


Python UDFs may rely on third-party libraries to execute and are not loaded when a Python process is started under default settings. We should provide a way for users to specify the UDF source files and the third-party dependencies.

Client Side

As mentioned in the previous section, it will launch a Python process to extract the necessary information for creating a Java UserDefinedFunction wrapper. The user should make sure that PyFlink and the Python UDF itself and its the necessary dependencies are available in the Python environment of client side.

The configuration "python.files" and "python.client.executable" could be used to configure the client side Python environment. The files in "python.files" would be added to the environment variable PYTHONPATH and the Python process will be started using the interpreter path in "python.client.executable".

We also introduce an environment variable "PYFLINK_CLIENT_EXECUTABLE" for the convenience of users. It has the same functionality as the configuration "python.client.executable". The priority is as following:

  1. Configuration set in job.
  2. Environment variable.
  3. Configuration set in flink-conf.yaml.

Cluster Side

FLIP-78 has introduced two ways for Python dependency management in release 1.10: several Python Table API and several command line arguments. The newly introduced command line arguments could also be used here. However, as it’s originally implemented for the Python Table API program, we should improve it a little to make sure that it also works for the Java/Scala Table API programs which contain Python UDFs. 

Introduce PythonProgramOptions

Currently the Python dependency management related command line arguments are processed in PythonDriver. However, PythonDriver is only used for Python Table API programs (and also the Python DataStream API programs in the future). To make sure that the Python dependency management related command line arguments could also work for Java/Scala Table API programs, we should move the handling of them out of PythonDriver. We could introduce PythonProgramOptions for this purpose.

If the command line arguments contain the python arguments, a PythonProgramOptions object would be created to store those python arguments and translate to the Python UDF dependency management configurations eventually. These configurations would be passed to ExecutionEnvironment/StreamExecutionEnvironment. 

Users can also set the Python UDF dependency management configurations to TableEnvironment#getConfig#getConfiguration in the job source code.

Pass the Python dependency management configurations on to Python operator

If the job contains Python UDFs, a Python operator will be created to execute the Python UDF. The Python UDF dependency management configurations will be used to construct the Python UDF operators.

Anticipate Changes in Different Modules

  • flink-core:

The Python UDF dependency management configuration definitions and some reused processing logic would be placed in flink-core because they would be used in the flink-java module and the flink-streaming-java module.

  • flink-java:

We need to add code in ExecutionEnvironment#configure method to process the python configurations.

  • flink-streaming-java:

We need to add code in StreamExecutionEnvironment#configure method to process the python configurations.

  • flink-client:

We need to translate the python command options to the python configurations and pass to the ContextEnvironment before executing the user code.

  • flink-table:

When constructing the python operators, both configurations in TableConfig and ExecutionEnvironment/StreamExecutionEnvironment need to be considered.

  • flink-python:

The main changes of this FLIP are placed here.

Examples

Specify dependencies via command line arguments

Program:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");

Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");

tEnv.toDataSet(table, String.class).collect();


Submit with the following command:

$ PYFLINK_CLIENT_EXECUTABLE=python3 flink run -pyfs /home/my/test1.py sql-job.jar

Specify dependencies via configuration

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");

tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");

Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");

tEnv.toDataSet(table, String.class).collect();


Submit with the following command:

$ flink run sql-job.jar

Compatibility, Deprecation, and Migration Plan

This FLIP is a new feature and so there is no compatible issue with previous versions.

Implementation Plan

  1. Support specifying Python dependencies via configurations and environment variable.
  2. Support creating Python UDF via SQL Function DDL.