Authors:  Wei Zhong, Xingbo Huang

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-24 proposed SQL Client which provides an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. It is a useful tool for prototyping and playing around with Flink SQL. 

Flink Python UDF(FLIP-58) has already been introduced in the release of 1.10.0 and the support for SQL DDL is introduced in FLIP-106.  

SQL Client defines UDF via the environment file and has its own CLI implementation to manage dependencies, but neither of which supports Python UDF. We want to introduce the support of Python UDF for SQL Client, including the registration and the dependency management of Python UDF.

Proposed Changes

Define Python UDF in the SQL Client Environment File


Currently the format for Java UDF in SQL Client environment file is:

functions:

  - name: myUDF

    from: class

    class: foo.bar.AggregateUDF

    constructor:

      - 7.6

      - false


We expand the "from" property to support defining Python UDFs in the following formats:

functions:

  - name: func1

    from: python-object

    fully-qualified-name: test.func1


The "name" property defines the function name used in SQL query. "from: python-object" means this UDF comes from a python object. The value of the property "fully-qualified-name" is the fully qualified name of the Python UDF object, which contains its module name and object name. The format is "module_name.object_name".

How to process the Python UDF in the environment file


When processing Python UDFs in the environment file, Java UserDefinedFunction wrappers  would be created for corresponding Python UDFs. A Python process will be launched which is used to provide the necessary information such as the input types, result type, the deterministic information to the instantiation of Python UDFs. Then these wrappers will be registered to TableEnvironment just as other normal Java UDFs. This design comes from FLIP-106 and will reuse its code.

Python UDF Dependency Management in SQL Client


Users can add Java Dependencies via command line option "-j" and "-l" when launching SQL Client. We follow the design and leverage the Python dependency management command line options in "flink run". Users can add Python Dependencies via such command line options when launching SQL Client:

Short Name

Full Name

Syntax

Description

-pyarch

--pyArchives

-pyarch <archiveFile1>#<extractName>,<archiveFile2>#<extractName>

Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory is specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF. The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r'). 

-pyexec

--pyExecutable

-pyexec <pythonInterpreterPath>

Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements.

-pyfs

--pyFiles

-pyfs <filePaths>

Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma (',') could be used as the separator to specify multiple files.

-pyreq

--pyRequirements

-pyreq <requirementsFile>#<requirementsCachedDir>

Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists.


Examples

Python UDF in test.py:

from pyflink.table import DataTypes

from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def func1(s: str):

   return s.replace('bar', 'foo')


Define Python UDF in env.yaml:

functions:

 - name: myUDF  # Java UDF

   from: class

   class: foo.bar.AggregateUDF

   constructor:

   - 7.6

   - false

 - name: func1  # Python UDF

   from: python-object

   fully-qualified-name: test.func1


Add python dependencies:

$ sql-client.sh embedded -e env.yaml -pyfs test.py 


# full example of python dependency management

$ sql-client.sh embedded -e env.yaml -pyfs test.py,/home/my/func2.py -pyreq /home/my/requirements.txt#/home/my/pacakges_dir -pyarch py37.zip#venv -pyexec venv/py37/bin/python

Use Python UDF in SQL Client:

Flink SQL > SELECT func1(a) AS a FROM source_table;

Compatibility, Deprecation, and Migration Plan

This FLIP is a new feature and so there is no compatible issue with previous versions.

Implementation Plan

  1. Support define Python UDF in the SQL Client environment file.
  2. Support add python dependencies via SQL Client command line.